Tutorial

Simple usage

Simplest case - distributed RPC. Client sends request that will be handled by one of several servers.

graph simple {
   rankdir=LR;
   node[shape=box];
   subgraph cluster_servers {
     rank=sink; style=dashed;
     Server1[label="RPC Server 1"];
     Server2[label="RPC Server 2"];
     ServerDots[label="...", shape=none];
     ServerN[label="RPC Server N"];
     }
   { rank=source; Client[label="RPC Client"]; }
   AMQPServer[label="AMQP Server", shape=ellipse];
   Client -- AMQPServer;
   AMQPServer -- Server1[label="One call"];
   AMQPServer -- Server2[label="Another call"];
   AMQPServer -- ServerN;
}

Everything you need is to create a server instance and create a client instance. When amphora connects to AMQP server it creates all needed queues and exchanges.

Let’s start with server.

Amphora allows to have multiple RPC services in one AMQP virtual host. You should specify namespace name for each your service. In our example we will do simple arithmetic operations at RPC servers. So name our namespace “math”.

#!/usr/bin/env python
from amphora import AmqpRpcServer
server = AmqpRpcServer('math')
server.serve(nowait=False)

Now you can visit RabbitMQ management web-interface and you’ll see that amphora created exchanges rpc_math_request and rpc_math_response. Also was created queue rpc_math_request and bound to exchange with the same name.

Add some functions to RPC server:

#!/usr/bin/env python
from math import sqrt
from amphora import AmqpRpcServer
server = AmqpRpcServer('math')

@server.add_function
def sum_numbers(*args):
    return sum(args)

@server.add_function
def hypotenuse(cathetus1, cathetus2):
    return sqrt(cathetus1 ** 2 + cathetus2 ** 2)

server.serve(nowait=False)

That’s all, our server is ready! You can start as many server processes as you want.

Now we will write simple client. We need to specify one namespace both in server and client.

#!/usr/bin/env python
from amphora import AmqpRpcClient
client = AmqpRpcClient('math')

import gevent; gevent.sleep(1)  # Let the amphora create all queues

Look again at RabbitMQ management. There was created another queue that looks like rpc_math_somemagicstring_response. It bound with rpc_math_response with routing key somemagicstring.

Now we can do requests:

#!/usr/bin/env python
from amphora import AmqpRpcClient
client = AmqpRpcClient('math')

import gevent; gevent.sleep(1)  # Let the amphora create all queues

print client.call.sum_numbers(1, 2, 3, 4)  # Prints 10
print client.call.hypotenuse(3, 4)  # Prints 5.0

Amphora allows you to handle remote exceptions:

#!/usr/bin/env python
from amphora import AmqpRpcClient
client = AmqpRpcClient('math')

import gevent; gevent.sleep(1)

print client.call.hypotenuse("foo", "bar")
# Traceback:
#   ...
# RemoteException: TypeError: unsupported operand type(s) for ** or pow(): 'str' and 'int'

Asynchronous calls

Often you don’t want to wait until remote call finished. Maybe you want to make parallel requests or you don’t bother about remote call result.

Lets create server function that can work several seconds:

from urllib2 import build_opener, URLError
from amphora import AmqpRpcServer

server = AmqpRpcServer('example')

@server.add_function
def check_is_down(site):
    opener = build_opener()
    try:
        opener.open(site)
    except URLError:
        return "Site {0} is down".format(site)
    return "Site {0} is up".format(site)

server.serve(nowait=False)

Client implementation:

from amphora import AmqpRpcClient

client = AmqpRpcClient('example')

# All of them executed asynchronously
results = [client.defer(site) for site in (
    'http://google.com', 'http://yandex.com', 'http://example.com',
    'http://thereisnosuchsitename.com')]

# Now you can do some stuff
do_another_work()

# Time to check results
for result in results:
    print result.get()

Different request queues

More complex example. You have distributed web service that observes multiple users. One part of service (“API”) communicates with user via WebSockets or long polling. Another part of service (“Core”) contains all busines logics but does not interact with user directly.

“Core” wants to immediately show the user Alice some message or wants from user to fill form.

You have many “API” servers and each user can be observed only by one “API” server at time. “Core” does not know which of “API” servers interacts with user Alice and which interacts with Bob.

digraph queues {
    rankdir=LR; node[shape=box]; size=10;

    alice[label="Alice", shape=doublecircle]

    subgraph apis {
      api1[label="API server 1"];
      api2[label="API server 2"];
      api3[label="API server 3"];
    }

    amqp[label="AMQP server", shape=ellipse];

    core[label="Core server"];

    alice -> api1[label="Request can be handled\nby these server...", style=dashed];
    alice -> api2[label="Or by these...", style=dashed];
    alice -> api3[label="Or by these...", style=dashed];

    api1 -> amqp [arrowhead=none];
    api2 -> amqp [arrowhead=none];
    api3 -> amqp [arrowhead=none];

    amqp -> core [arrowhead=none];

    core -> alice [arrowhead=onormal, style=dotted,
        label="How to send\nmessage to Alice?"];
}

AmqpRpcServer can create request queues on demand and remove them when they becomes needless.

“API” at startup creates RPC server. When user requests new messages then “API” executes method amphora.AmqpRpcServer.receive_from_queue() and waits until some event occurs.

Meanwhile when RPC server get request to send message to user it triggers event and user request wakes up.

# views.py
import gevent
from handlers import server, events

def user_get_message(request):
    username = request.user.username
    try:
        server.receive_from_queue(
            username, max_calls=1, timeout=10, block=True)
    except gevent.Timeout:
        return {'ok': True, 'message': None}
    message = events.pop(username)
    return {'ok': True, 'message': message}


# handlers.py
from amphora import AmqpRpcServer

server = AmqpRpcServer("messages")
events = {}

@server.add_function
def user_show_message(username, message):
    events[username] = message

server.serve()

If queue does not exist, amphora.AmqpRpcServer.receive_from_queue() will silently wait until it creates.

Before sending messages client should create request queue for user:

client.create_new_request_queue("Alice")

When “Core” wants to send message to user it just calls:

client.defer(routing_key="Alice").show_message(
     "Alice", "Hello, Alice!")

Or you can use amphora.AmqpRpcClient.tune_function() that will determine routing key:

@client.tune_function('show_message')
def message_tune_function(args, kwargs):
    if args:
        return args[0]
    return kwargs.get('username')

client.defer.show_message("Alice", "Goodbye, Alice!") # Routing key "Alice"
client.call.show_message(username="Bob", message="Goodbye, Bob!")  # Routing key "Bob"

Broadcasting requests

If you want to execute each request in all server instances, then use AmqpRpcFanoutServer and AmqpRpcFanoutClient.

Handler function in fanout can ignore request by raising amphora.IgnoreRequest.

Current implementation can’t get all results from each server. If you execute amphora.AmqpRpcFanoutClient.call() and each server will return the result then first delivered response will be returned and other responses will be lost.

Project Versions

Table Of Contents

Previous topic

About reliability

Next topic

API

This Page