API

Server API

class amphora.AmqpRpcServer(service_name, serializer=<class 'amphora.serializer.RpcJsonSerializer'>, **kwargs)

Server class for AMQP RPC.

Parameters:
  • service_name (str) – Namespace of your RPC service.
  • serializer (AbstractRpcSerializer) – Class for generating AMQP messages. JSON messages generated by default.
  • amqp_host (str) – IP address or hostname of AMQP server. By default 127.0.0.1.
  • amqp_port (int) – TCP port of AMQP server. By default 5672.
  • amqp_user (str) – Username for authentication in AMQP. By default guest.
  • amqp_password (str) – Password for authentication in AMQP. By default guest.
  • amqp_vhost – Virtual host for authentication in AMQP. By default /.
add_function(func, name=None)

Add function for handling remote call. Can be used as common function or as decorator.

Parameters:
  • func (callable) – Remote call handler.
  • name (str) – Remote call handler name. If omited then call names the same as handler name.

Example:

# math_server.py
from gevent import monkey; monkey.patch_socket()
from amphora import AmqpRpcServer

server = AmqpRpcServer('simplemath')

def add(x, y):
    return x + y
server.add_function(add)

def substract(x, y):
    return x - y
server.add_function(substract, name="sub")

@server.add_function
def multiply(x, y):
    return x * y

@server.add_function('div')
def divide(x, y):
    return x / y

server.serve(nowait=False)


# math_client.py
from gevent import monkey; monkey.patch_socket()
from amphora import AmqpRpcClient

client = AmqpRpcClient('simplemath')
# TODO: must work without __import__('gevent').sleep(1)
print client.call.add(2, 3)  # 5
print client.call.sub(10, 3)  # 7
print client.call.multiply(5, 5)  # 25
print client.call.div(64, 16)  # 4

Added function can raise amphora.IgnoreRequest for re-sending this request to another server.

receive_from_queue(queue, max_calls=None, timeout=None, block=True, block_until_closed=True)

Receive new calls from specified queue.

amphora.AmqpRpcServer authomatically starts receiving from queue rpc_{service_name}_request, but you may enable receiving calls from several queues.

Parameters:
  • queue (str) – AMQP queue name with requests.
  • max_calls (int) – Maximum call count. By default unlimited.
  • timeout (float) – Stop receiving from queue after timeout seconds.
  • block (bool) – Block current greenlet until maximum calls reached or timed out.
  • block_until_closed (bool) – Block current greenlet until all requests will be handled and channel will be closed. Ignored if block == False
Raises:
  • gevent.Timeout – When timed out and maximum calls was not reached.
  • ValueError – If queue not found.

If you set max_calls then AmqpRpcServer handles only specified calls. After reaching maximum AmqpRpcServer cancel consuming from queue but does not close channel until all handling messages acked or rejected. Same rules applies for timeout. You can specify both parameters at once.

When requests delivered too frequently, AMQP server can send some request messages just after basic_cancel call. These messages will be rejected with requeuing. In order to avoid infinite resending messages that can cause denial of service, AmqpRpcServer can reject messages with one second delay.

When max_calls specified, basic.qos with similar prefetch count will be sent before start consuming.

serve(nowait=True)

Connect to AMQP and start working.

Parameters:nowait (bool) – Should current greenlet being blocked until server stop? By default don’t block.
prepare_stop()

Reject any new requests. Use this method in couple with stop() when you want to do “warm shutdown”.

stop()

Stop instance and disconnect from AMQP server. All unsent messages will be hold until you’ll call serve() again.

stop_publisher()

Stop publishing messages. Consuming continues working (if it was not stopped earlier).

stop_consumer()

Stop consuming messages. Publishing continues working (if it was not stopped earlier).

class amphora.AmqpRpcFanoutServer(service_name, serializer=<class 'amphora.serializer.RpcJsonSerializer'>, **kwargs)

Fanout server class for AMQP RPC.

Fanout exchanges in AMQP used for broadcasting. If you execute remote function via AmqpRpcFanoutClient then request will be handled by every connected AmqpRpcFanoutServer.

One difference from AmqpRpcServer it that added functions can raise amphora.IgnoreRequest. When function raises it, server sends “ack” to request message but does not send any result or exception back to client.

Client API

class amphora.AmqpRpcClient(service_name, uuid=None, serializer=<class 'amphora.serializer.RpcJsonSerializer'>, timeout=60, autostart=True, **kwargs)

Client class for AMQP RPC.

Parameters:
  • service_name (str) – Namespace of your RPC service.
  • uuid (str) – Unique identifier for current client instance. If omited then will be generated random id.
  • serializer (AbstractRpcSerializer) – Class for generating AMQP messages. JSON messages generated by default.
  • timeout (float) – Default timeout for call
  • amqp_host (str) – IP address or hostname of AMQP server. By default 127.0.0.1.
  • amqp_port (int) – TCP port of AMQP server. By default 5672.
  • amqp_user (str) – Username for authentication in AMQP. By default guest.
  • amqp_password (str) – Password for authentication in AMQP. By default guest.
  • amqp_vhost – Virtual host for authentication in AMQP. By default /.
defer

Call remote procedure without blocking current greenlet.

Parameters:
  • function_name (str) – You can specify function name directly as string.
  • routing_key (str) – Routing key for remote call.
  • wait_publish (bool) – Wait until request message published.
Returns:

Helper object for calling remote procedures.

Return type:

amphora.PrettyCaller

When helper object called, it returns gevent.event.AsyncResult instance. When remote function completes returned value stores into AsyncResult.

You can specify routing key for this call in two ways:

Examples:

client = amphora.AmqpRpcClient("example")

# Remote call of function with name "send_email"
async_result = client.defer.send_email('root@example.com', "Hello!")
print async_result.get()

# Remote call of function with name "show_message"
# with custom routing key
client.create_new_request_queue("user12345")
client.defer(routing_key="user12345").show_message(
     "Hello, user12345!")

# Remote call of functions with names "send_message.sms"
client.defer.send_message.sms("+12345678901", "Message")

# You can use helper objects like any normal python objects.
# This code calls "send_message.email.text"
# and "send_message.email.html"
email_sender = client.defer.send_message.email
email_sender.text("root@example.com", "Message")
email_sender.html("user@example.com", "Message")

# If you don't want to generate function name with helper
for format in ("html", "text"):
    client.defer(function_name="send_message.email." + format)(
        "root@example.com", "Hello!")
call

Call remote procedure with blocking current greenlet.

Parameters:
  • function_name (str) – You can specify function name directly as string.
  • timeout (float) – Timeout for waiting for result of remote call. If not specified then used timeout specified in constructor.
  • routing_key (str) – Routing key for remote call.
Raises:
Returns:

Helper object for calling remote procedures.

Return type:

amphora.PrettyCaller

When helper object called, it waits until remote function completes and returns result of remote function.

Look for examples and explanations into defer documentation.

create_new_request_queue(queue, routing_key=None, x_expires=None, nowait=False)

Asynchronously create new queue and bind it to request exchange.

Parameters:
  • queue (str) – Queue label that will be used for generating queue name.
  • routing_key (str) – Routing key for binding to request exchange.
  • x_expires (int or None) – Queue TTL in microseconds. http://www.rabbitmq.com/ttl.html
  • nowait (bool) – Should block current greenlet until queue created and bound? If True then don’t block.

If routing key not specified then routing key will be the same as queue name.

Then name of the queue passes to template rpc_{queue}_request.

For example, if you call create_new_request_queue('test') then creates queue rpc_test_request and binds to request exchange via routing key test.

remove_request_queue(queue, nowait=False)

Asynchronously removes queue.

Parameters:
  • queue (str) – Queue label that will be used for generating queue name.
  • nowait (bool) – Should block current greenlet until queue deleted? If True then don’t block.

The name of the queue generates like in create_new_request_queue().

Warning

If you call remove_request_queue() and then immediately call create_new_request_queue() then queue will be deleted but may not be created.

tune_function(func_name)

Set the function that will calculate queue and routing key for specified function by passed args.

Your tuning function should return dict with two keys: "queue" and "routing_key". (queue is deprecated).

Example:

from amphora import AmqpRpcClient

def calculate(args, kwargs):  # Note, no * or **
    user_id = str(kwargs['user_id'])
    return {'queue': user_id, 'routing_key': user_id}

client = AmqpRpcClient('test')
client.tune_function('show_message')(calculate)

# Will be called with routing key "12345"
client.defer.show_message("Hello!", user_id=12345)
serve(nowait=True)

Connect to AMQP and start working.

Parameters:nowait (bool) – Should current greenlet being blocked until server stop? By default don’t block.
stop()

Stop instance and disconnect from AMQP server. All unsent messages will be hold until you’ll call serve() again.

stop_publisher()

Stop publishing messages. Consuming continues working (if it was not stopped earlier).

stop_consumer()

Stop consuming messages. Publishing continues working (if it was not stopped earlier).

class amphora.AmqpRpcFanoutClient(service_name, uuid=None, serializer=<class 'amphora.serializer.RpcJsonSerializer'>, timeout=60, autostart=True, **kwargs)

Client for AmqpRpcFanoutServer. API is the same as for AmqpRpcClient.

Exceptions

exception amphora.NoResult

Raises by amphora.AmqpRpcClient.call when request is timed out.

exception amphora.WrongRequest

Raises by amphora.AmqpRpcClient.call or by gevent.AsyncResult.get() returned by amphora.AmqpRpcClient.defer when server can’t parse request. For example, when you try to call function not registered in server.

exception amphora.WrongResult

Raises by amphora.AmqpRpcClient.call or by gevent.AsyncResult.get() returned by amphora.AmqpRpcClient.defer when server can’t parse response. Normally you will never get this exception.

exception amphora.RemoteException(error_type, args)

Raises by amphora.AmqpRpcClient.call or by gevent.AsyncResult.get() returned by amphora.AmqpRpcClient.defer when remote function raises exception.

Parameters:
  • error_type (str) – Class name of remote exception.
  • args (tuple) – Arguments passed to remote exception.

Example:

# server.py
@server.add_function
def divide(x, y):
    if x < 0:
        raise ValueError('x', x)
    elif y < 0:
        raise ValueError('y', y)
    else:
        return x / y

# client.py
from amphora import RemoteException
client.call.divide(6, 3)  # Prints "2"

try:
   client.call.divide(-5, 5)
except RemoteException as exc:
   print exc.error_type  # Prints "ValueError"
   print exc.args  # Prints '("x", -5)'

try:
   client.call.divide(10, 0)
except RemoteException as exc:
   print exc.error_type  # Prints "ZeroDivisionError"
exception amphora.IgnoreRequest

Reject request when raised in AmqpRpcServer or ignore it when raised in AmqpRpcFanoutServer.

Project Versions

Table Of Contents

Previous topic

Tutorial

This Page