This document is for Kombu's development version, which can be significantly different from previous releases. Get the stable docs here: 5.0.
Automatic Failover¶
Automatic failover is functionality for connecting to clustered broker. Application using automatic failover should be able to automatically connect to healthy node and react to unexpected failure of node in cluster.
Connection failover¶
The Connection
is accepting multiple URLs to several brokers. During connecting to broker, kombu is automatically picking the healthy node from the list. In the example below, kombu uses healthy.example.com broker:
>>> conn = Connection(
... 'amqp://guest:guest@broken.example.com;guest:guest@healthy.example.com'
... )
>>> conn.connect()
>>> conn
<Connection: amqp://guest:**@healthy.example.com at 0x6fffff751710>
Connection
also accepts failover_strategy parameter which defines the strategy of trying the nodes:
>>> Connection(
... 'amqp://broker1.example.com;amqp://broker2.example.com',
... failover_strategy='round-robin'
... )
The current list of available failver strategies is defined in kombu.connection module:
>>> import kombu
>>> kombu.connection.failover_strategies
{'round-robin': <class 'itertools.cycle'>, 'shuffle': <function shufflecycle at 0x6fffff8547a0>}
Failover during connection handle only failover during calling connect()
method of Connection
.
Operation failover¶
Failover of connection using multiple connection strings in Connection
solves problem when broker is unavailable during creating new connection. But in real world these connections are long lived and hence it is possible that broker fails during lifetime of connection. For this scenario retrying of operation executed against broker is needed. Retrying ensures that failed operation triggers new connection to healthy broker and re-execution of failed operation.
Failover is implemented in ensure()
method which tries to execute the function. When contacting broker fails, it reconnects the underlying connection and re-executes the function again. The following example is ensuring that publish()
method is re-executed when errors occurred:
>>> from kombu import Connection, Producer
>>> conn = Connection('amqp://')
>>> producer = Producer(conn)
>>> def errback(exc, interval):
... logger.error('Error: %r', exc, exc_info=1)
... logger.info('Retry in %s seconds.', interval)
>>> publish = conn.ensure(producer, producer.publish,
... errback=errback, max_retries=3)
>>> publish({'hello': 'world'}, routing_key='dest')
Some methods are accepting channel as a parameter, e.g. declare()
. Since channel is passed as parameter, it is not refreshed automatically during failover and hence retrying calling of method fails. In this scenarios autoretry()
needs to be used which automatically passes channel and refresh it during failover:
>>> import kombu
>>> conn = kombu.Connection('amqp://broker1:5672;amqp://broker2:5672')
>>> conn.connect()
>>> q = kombu.Queue('test_queue')
>>> declare = conn.autoretry(q.declare)
>>> declare()
Producer¶
publish()
can have automatic failover using ensure()
as mentioned before. Moreover, it contains retry parameter as a shortcut for retrying. The following example is retrying publishing when error occurs:
>>> from kombu import *
>>> with Connection('amqp://broker1:5672;amqp://broker2:5672') as conn:
... with conn.channel() as channel:
... producer = conn.Producer()
... producer = Producer(channel)
... producer.publish(
... {'hello': 'world'}, routing_key='queue', retry=True
... )
Consumer¶
Consumer with failover functionality can be implemented using following function:
>>> def consume():
... while True:
... try:
... conn.drain_events(timeout=1)
... except socket.timeout:
... pass
This function is draining events in infinite loop with timeout to avoid blocked connections of unavailable broker. Consumer with failover is implemented by wrapping consume function using ensure()
method:
>>> consume = conn.ensure(conn, consume)
>>> consume()
The full example implementing consumer with failover is as follows:
>>> from kombu import *
>>> import socket
>>> def callback(body, message):
... print(body)
... message.ack()
>>> queue = Queue('queue', routing_key='queue')
>>> with Connection('amqp://broker1:5672;amqp://broker2:5672') as conn:
... def consume():
... while True:
... try:
... conn.drain_events(timeout=1)
... except socket.timeout:
... pass
... with conn.channel() as channel:
... consumer = Consumer(channel, queue)
... consumer.register_callback(callback)
... with consumer:
... while True:
... consume = conn.ensure(conn, consume)
... consume()
When implementing consumer as ConsumerMixin
, the failover functionality is by wrapping consume method with ensure()
:
>>> from kombu import *
>>> from kombu.mixins import ConsumerMixin
>>> class C(ConsumerMixin):
... def __init__(self, connection):
... self.connection = connection
... def get_consumers(self, Consumer, channel):
... return [
... Consumer(
... [Queue('queue', routing_key='queue')],
... callbacks=[self.on_message], accept=['json']
... ),
... ]
... def on_message(self, body, message):
... print('RECEIVED MESSAGE: {0!r}'.format(body))
... message.ack()
... def consume(self, *args, **kwargs):
... consume = conn.ensure(conn, super().consume)
... return consume(*args, **kwargs)
>>> with Connection('amqp://broker1:5672;amqp://broker2:5672') as conn:
... C(conn).run()