This document is for Kombu's development version, which can be significantly different from previous releases. Get the stable docs here: 5.0.
Common Utilities - kombu.common
¶
Common Utilities.
- class kombu.common.Broadcast(name=None, queue=None, unique=False, auto_delete=True, exchange=None, alias=None, **kwargs)[source]¶
Broadcast queue.
Convenience class used to define broadcast queues.
Every queue instance will have a unique name, and both the queue and exchange is configured with auto deletion.
- Parameters
name (str) – This is used as the name of the exchange.
queue (str) – By default a unique id is used for the queue name for every consumer. You can specify a custom queue name here.
unique (bool) – Always create a unique queue even if a queue name is supplied.
**kwargs (Any) – See
Queue
for a list of additional keyword arguments supported.
- attrs = (('name', None), ('exchange', None), ('routing_key', None), ('queue_arguments', None), ('binding_arguments', None), ('consumer_arguments', None), ('durable', <class 'bool'>), ('exclusive', <class 'bool'>), ('auto_delete', <class 'bool'>), ('no_ack', None), ('alias', None), ('bindings', <class 'list'>), ('no_declare', <class 'bool'>), ('expires', <class 'float'>), ('message_ttl', <class 'float'>), ('max_length', <class 'int'>), ('max_length_bytes', <class 'int'>), ('max_priority', <class 'int'>), ('queue', None))¶
- kombu.common.collect_replies(conn, channel, queue, *args, **kwargs)[source]¶
Generator collecting replies from
queue
.
- kombu.common.drain_consumer(consumer, limit=1, timeout=None, callbacks=None)[source]¶
Drain messages from consumer instance.
- kombu.common.eventloop(conn, limit=None, timeout=None, ignore_timeouts=False)[source]¶
Best practice generator wrapper around
Connection.drain_events
.Able to drain events forever, with a limit, and optionally ignoring timeout errors (a timeout of 1 is often used in environments where the socket can get “stuck”, and is a best practice for Kombu consumers).
eventloop
is a generator.Examples
>>> from kombu.common import eventloop
>>> def run(conn): ... it = eventloop(conn, timeout=1, ignore_timeouts=True) ... next(it) # one event consumed, or timed out. ... ... for _ in eventloop(conn, timeout=1, ignore_timeouts=True): ... pass # loop forever.
It also takes an optional limit parameter, and timeout errors are propagated by default:
for _ in eventloop(connection, limit=1, timeout=1): pass
See also
itermessages()
, which is an event loop bound to one or more consumers, that yields any messages received.
- kombu.common.insured(pool, fun, args, kwargs, errback=None, on_revive=None, **opts)[source]¶
Function wrapper to handle connection errors.
Ensures function performing broker commands completes despite intermittent connection failures.
- kombu.common.itermessages(conn, channel, queue, limit=1, timeout=None, callbacks=None, **kwargs)[source]¶
Iterator over messages.
- kombu.common.maybe_declare(entity, channel=None, retry=False, **retry_policy)[source]¶
Declare entity (cached).
- kombu.common.send_reply(exchange, req, msg, producer=None, retry=False, retry_policy=None, **props)[source]¶
Send reply for request.
- Parameters
exchange (kombu.Exchange, str) – Reply exchange
req (Message) – Original request, a message with a
reply_to
property.producer (kombu.Producer) – Producer instance
retry (bool) – If true must retry according to the
reply_policy
argument.retry_policy (Dict) – Retry settings.
**props (Any) – Extra properties.