========== Tutorial ========== Simple usage ============ Simplest case - distributed RPC. Client sends request that will be handled by one of several servers. .. graphviz:: graph simple { rankdir=LR; node[shape=box]; subgraph cluster_servers { rank=sink; style=dashed; Server1[label="RPC Server 1"]; Server2[label="RPC Server 2"]; ServerDots[label="...", shape=none]; ServerN[label="RPC Server N"]; } { rank=source; Client[label="RPC Client"]; } AMQPServer[label="AMQP Server", shape=ellipse]; Client -- AMQPServer; AMQPServer -- Server1[label="One call"]; AMQPServer -- Server2[label="Another call"]; AMQPServer -- ServerN; } Everything you need is to create a server instance and create a client instance. When amphora connects to AMQP server it creates all needed queues and exchanges. Let's start with server. Amphora allows to have multiple RPC services in one AMQP virtual host. You should specify namespace name for each your service. In our example we will do simple arithmetic operations at RPC servers. So name our namespace `"math"`. .. code-block:: python #!/usr/bin/env python from amphora import AmqpRpcServer server = AmqpRpcServer('math') server.serve(nowait=False) Now you can visit RabbitMQ management web-interface and you'll see that amphora created exchanges `rpc_math_request` and `rpc_math_response`. Also was created queue `rpc_math_request` and bound to exchange with the same name. Add some functions to RPC server: .. code-block:: python #!/usr/bin/env python from math import sqrt from amphora import AmqpRpcServer server = AmqpRpcServer('math') @server.add_function def sum_numbers(*args): return sum(args) @server.add_function def hypotenuse(cathetus1, cathetus2): return sqrt(cathetus1 ** 2 + cathetus2 ** 2) server.serve(nowait=False) That's all, our server is ready! You can start as many server processes as you want. Now we will write simple client. We need to specify one namespace both in server and client. .. code-block:: python #!/usr/bin/env python from amphora import AmqpRpcClient client = AmqpRpcClient('math') import gevent; gevent.sleep(1) # Let the amphora create all queues Look again at RabbitMQ management. There was created another queue that looks like ``rpc_math_somemagicstring_response``. It bound with ``rpc_math_response`` with routing key ``somemagicstring``. .. When you do the remote call, amphora sends request message to exchange ``rpc_math_request``. AMQP server stores this message into queue ``rpc_math_request`` from which the server takes request. When request is done the server publishes message with result into exchange ``rpc_math_response`` with routing key ``somemagitstring``. RabbitMQ stores this message into queue ``rpc_math_somemagistrging_response Now we can do requests: .. code-block:: python #!/usr/bin/env python from amphora import AmqpRpcClient client = AmqpRpcClient('math') import gevent; gevent.sleep(1) # Let the amphora create all queues print client.call.sum_numbers(1, 2, 3, 4) # Prints 10 print client.call.hypotenuse(3, 4) # Prints 5.0 Amphora allows you to handle remote exceptions: .. code-block:: python #!/usr/bin/env python from amphora import AmqpRpcClient client = AmqpRpcClient('math') import gevent; gevent.sleep(1) print client.call.hypotenuse("foo", "bar") # Traceback: # ... # RemoteException: TypeError: unsupported operand type(s) for ** or pow(): 'str' and 'int' Asynchronous calls ================== Often you don't want to wait until remote call finished. Maybe you want to make parallel requests or you don't bother about remote call result. Lets create server function that can work several seconds: .. code-block:: python from urllib2 import build_opener, URLError from amphora import AmqpRpcServer server = AmqpRpcServer('example') @server.add_function def check_is_down(site): opener = build_opener() try: opener.open(site) except URLError: return "Site {0} is down".format(site) return "Site {0} is up".format(site) server.serve(nowait=False) Client implementation: .. code-block:: python from amphora import AmqpRpcClient client = AmqpRpcClient('example') # All of them executed asynchronously results = [client.defer(site) for site in ( 'http://google.com', 'http://yandex.com', 'http://example.com', 'http://thereisnosuchsitename.com')] # Now you can do some stuff do_another_work() # Time to check results for result in results: print result.get() Different request queues ======================== More complex example. You have distributed web service that observes multiple users. One part of service (`"API"`) communicates with user via WebSockets or long polling. Another part of service (`"Core"`) contains all busines logics but does not interact with user directly. `"Core"` wants to immediately show the user Alice some message or wants from user to fill form. You have many `"API"` servers and each user can be observed only by one `"API"` server at time. `"Core"` does not know which of `"API"` servers interacts with user Alice and which interacts with Bob. .. graphviz:: digraph queues { rankdir=LR; node[shape=box]; size=10; alice[label="Alice", shape=doublecircle] subgraph apis { api1[label="API server 1"]; api2[label="API server 2"]; api3[label="API server 3"]; } amqp[label="AMQP server", shape=ellipse]; core[label="Core server"]; alice -> api1[label="Request can be handled\nby these server...", style=dashed]; alice -> api2[label="Or by these...", style=dashed]; alice -> api3[label="Or by these...", style=dashed]; api1 -> amqp [arrowhead=none]; api2 -> amqp [arrowhead=none]; api3 -> amqp [arrowhead=none]; amqp -> core [arrowhead=none]; core -> alice [arrowhead=onormal, style=dotted, label="How to send\nmessage to Alice?"]; } `AmqpRpcServer` can create request queues on demand and remove them when they becomes needless. `"API"` at startup creates RPC server. When user requests new messages then `"API"` executes method :py:func:`amphora.AmqpRpcServer.receive_from_queue` and waits until some event occurs. Meanwhile when RPC server get request to send message to user it triggers event and user request wakes up. .. code-block:: python # views.py import gevent from handlers import server, events def user_get_message(request): username = request.user.username try: server.receive_from_queue( username, max_calls=1, timeout=10, block=True) except gevent.Timeout: return {'ok': True, 'message': None} message = events.pop(username) return {'ok': True, 'message': message} # handlers.py from amphora import AmqpRpcServer server = AmqpRpcServer("messages") events = {} @server.add_function def user_show_message(username, message): events[username] = message server.serve() If queue does not exist, :py:func:`amphora.AmqpRpcServer.receive_from_queue` will silently wait until it creates. Before sending messages client should create request queue for user: .. code-block:: python client.create_new_request_queue("Alice") When `"Core"` wants to send message to user it just calls: .. code-block:: python client.defer(routing_key="Alice").show_message( "Alice", "Hello, Alice!") Or you can use :py:meth:`amphora.AmqpRpcClient.tune_function` that will determine routing key: .. code-block:: python @client.tune_function('show_message') def message_tune_function(args, kwargs): if args: return args[0] return kwargs.get('username') client.defer.show_message("Alice", "Goodbye, Alice!") # Routing key "Alice" client.call.show_message(username="Bob", message="Goodbye, Bob!") # Routing key "Bob" Broadcasting requests ===================== If you want to execute each request in all server instances, then use :py:class:`~amphora.AmqpRpcFanoutServer` and :py:class:`~amphora.AmqpRpcFanoutClient`. Handler function in fanout can ignore request by raising :py:exc:`amphora.IgnoreRequest`. Current implementation can't get all results from each server. If you execute :py:meth:`amphora.AmqpRpcFanoutClient.call` and each server will return the result then first delivered response will be returned and other responses will be lost.