This document is for Kombu's development version, which can be significantly different from previous releases. Get the stable docs here: 5.0.

Redis Transport - kombu.transport.redis

Redis transport module for Kombu.

Features

  • Type: Virtual

  • Supports Direct: Yes

  • Supports Topic: Yes

  • Supports Fanout: Yes

  • Supports Priority: Yes

  • Supports TTL: No

Connection String

Connection string has the following format:

redis://[USER:PASSWORD@]REDIS_ADDRESS[:PORT][/VIRTUALHOST]
rediss://[USER:PASSWORD@]REDIS_ADDRESS[:PORT][/VIRTUALHOST]

To use sentinel for dynamic Redis discovery, the connection string has following format:

sentinel://[USER:PASSWORD@]SENTINEL_ADDRESS[:PORT]

Transport Options

  • sep

  • ack_emulation: (bool) If set to True transport will simulate Acknowledge of AMQP protocol.

  • unacked_key

  • unacked_index_key

  • unacked_mutex_key

  • unacked_mutex_expire

  • visibility_timeout

  • unacked_restore_limit

  • fanout_prefix

  • fanout_patterns

  • global_keyprefix: (str) The global key prefix to be prepended to all keys used by Kombu

  • socket_timeout

  • socket_connect_timeout

  • socket_keepalive

  • socket_keepalive_options

  • queue_order_strategy

  • max_connections

  • health_check_interval

  • retry_on_timeout

  • priority_steps

Transport

class kombu.transport.redis.Transport(*args, **kwargs)[source]

Redis Transport.

class Channel(*args, **kwargs)

Redis Channel.

class QoS(*args, **kwargs)

Redis Ack Emulation.

ack(delivery_tag)

Acknowledge message and remove from transactional state.

append(message, delivery_tag)

Append message to transactional state.

pipe_or_acquire(pipe=None, client=None)
reject(delivery_tag, requeue=False)

Remove from transactional state and requeue message.

restore_at_shutdown = True

If disabled, unacked messages won’t be restored at shutdown.

restore_by_tag(tag, client=None, leftmost=False)
restore_unacked(client=None)

Restore all unacknowledged messages.

restore_visible(start=0, num=10, interval=10)

Restore any pending unackwnowledged messages.

To be filled in for visibility_timeout style implementations.

Note

This is implementation optional, and currently only used by the Redis transport.

property unacked_index_key
property unacked_key
property unacked_mutex_expire
property unacked_mutex_key
property visibility_timeout
ack_emulation = True
property active_queues

Set of queues being consumed from (excluding fanout queues).

property async_pool
basic_cancel(consumer_tag)

Cancel consumer by consumer tag.

basic_consume(queue, *args, **kwargs)

Consume from queue.

property client

Client used to publish messages, BRPOP etc.

close()

Close channel.

Cancel all consumers, and requeue unacked messages.

conn_or_acquire(client=None)
connection_class

alias of redis.connection.Connection

connection_class_ssl

alias of redis.connection.SSLConnection

fanout_patterns = True

If enabled the fanout exchange will support patterns in routing and binding keys (like a topic exchange but using PUB/SUB).

Enabled by default since Kombu 4.x. Disable for backwards compatibility with Kombu 3.x.

fanout_prefix = True

Transport option to disable fanout keyprefix. Can also be string, in which case it changes the default prefix (‘/{db}.’) into to something else. The prefix must include a leading slash and a trailing dot.

Enabled by default since Kombu 4.x. Disable for backwards compatibility with Kombu 3.x.

from_transport_options = ('body_encoding', 'deadletter_queue', 'sep', 'ack_emulation', 'unacked_key', 'unacked_index_key', 'unacked_mutex_key', 'unacked_mutex_expire', 'visibility_timeout', 'unacked_restore_limit', 'fanout_prefix', 'fanout_patterns', 'global_keyprefix', 'socket_timeout', 'socket_connect_timeout', 'socket_keepalive', 'socket_keepalive_options', 'queue_order_strategy', 'max_connections', 'health_check_interval', 'retry_on_timeout', 'priority_steps')
get_table(exchange)

Get table of bindings for exchange.

global_keyprefix = ''

The global key prefix will be prepended to all keys used by Kombu, which can be useful when a redis database is shared by different users. By default, no prefix is prepended.

health_check_interval = 25
keyprefix_fanout = '/{db}.'
keyprefix_queue = '_kombu.binding.%s'
max_connections = 10
property pool
priority(n)
priority_steps = [0, 3, 6, 9]
queue_order_strategy = 'round_robin'

Order in which we consume from queues.

Can be either string alias, or a cycle strategy class

  • round_robin (round_robin_cycle).

    Make sure each queue has an equal opportunity to be consumed from.

  • sorted (sorted_cycle).

    Consume from queues in alphabetical order. If the first queue in the sorted list always contains messages, then the rest of the queues will never be consumed from.

  • priority (priority_cycle).

    Consume from queues in original order, so that if the first queue always contains messages, the rest of the queues in the list will never be consumed from.

The default is to consume from queues in round robin.

retry_on_timeout = None
sep = '\x06\x16'
socket_connect_timeout = None
socket_keepalive = None
socket_keepalive_options = None
socket_timeout = None
property subclient

Pub/Sub connection used to consume fanout queues.

supports_fanout = True

flag set if the channel supports fanout exchanges.

unacked_index_key = 'unacked_index'
unacked_key = 'unacked'
unacked_mutex_expire = 300
unacked_mutex_key = 'unacked_mutex'
unacked_restore_limit = None
visibility_timeout = 3600
channel_errors = (<class 'amqp.exceptions.ChannelError'>, <class 'redis.exceptions.DataError'>, <class 'redis.exceptions.InvalidResponse'>, <class 'redis.exceptions.ResponseError'>)

Tuple of errors that can happen due to channel/method failure.

connection_errors = (<class 'amqp.exceptions.ConnectionError'>, <class 'kombu.exceptions.InconsistencyError'>, <class 'OSError'>, <class 'OSError'>, <class 'OSError'>, <class 'redis.exceptions.ConnectionError'>, <class 'redis.exceptions.AuthenticationError'>, <class 'redis.exceptions.TimeoutError'>)

Tuple of errors that can happen due to connection failure.

default_port = 6379

port number used when no port is specified.

driver_name = 'redis'

Name of driver library (e.g. ‘py-amqp’, ‘redis’).

driver_type = 'redis'

Type of driver, can be used to separate transports using the AMQP protocol (driver_type: ‘amqp’), Redis (driver_type: ‘redis’), etc…

driver_version()[source]
implements = {'asynchronous': True, 'exchange_type': frozenset({'direct', 'fanout', 'topic'}), 'heartbeats': False}
on_readable(fileno)[source]

Handle AIO event for one of our file descriptors.

polling_interval = None

Time to sleep between unsuccessful polls.

register_with_event_loop(connection, loop)[source]

Channel

class kombu.transport.redis.Channel(*args, **kwargs)[source]

Redis Channel.

class QoS(*args, **kwargs)

Redis Ack Emulation.

ack(delivery_tag)

Acknowledge message and remove from transactional state.

append(message, delivery_tag)

Append message to transactional state.

pipe_or_acquire(pipe=None, client=None)
reject(delivery_tag, requeue=False)

Remove from transactional state and requeue message.

restore_at_shutdown = True

If disabled, unacked messages won’t be restored at shutdown.

restore_by_tag(tag, client=None, leftmost=False)
restore_unacked(client=None)

Restore all unacknowledged messages.

restore_visible(start=0, num=10, interval=10)

Restore any pending unackwnowledged messages.

To be filled in for visibility_timeout style implementations.

Note

This is implementation optional, and currently only used by the Redis transport.

property unacked_index_key
property unacked_key
property unacked_mutex_expire
property unacked_mutex_key
property visibility_timeout
ack_emulation = True
property active_queues

Set of queues being consumed from (excluding fanout queues).

property async_pool
basic_cancel(consumer_tag)[source]

Cancel consumer by consumer tag.

basic_consume(queue, *args, **kwargs)[source]

Consume from queue.

property client

Client used to publish messages, BRPOP etc.

close()[source]

Close channel.

Cancel all consumers, and requeue unacked messages.

conn_or_acquire(client=None)[source]
connection_class

alias of redis.connection.Connection

connection_class_ssl

alias of redis.connection.SSLConnection

fanout_patterns = True

If enabled the fanout exchange will support patterns in routing and binding keys (like a topic exchange but using PUB/SUB).

Enabled by default since Kombu 4.x. Disable for backwards compatibility with Kombu 3.x.

fanout_prefix = True

Transport option to disable fanout keyprefix. Can also be string, in which case it changes the default prefix (‘/{db}.’) into to something else. The prefix must include a leading slash and a trailing dot.

Enabled by default since Kombu 4.x. Disable for backwards compatibility with Kombu 3.x.

from_transport_options = ('body_encoding', 'deadletter_queue', 'sep', 'ack_emulation', 'unacked_key', 'unacked_index_key', 'unacked_mutex_key', 'unacked_mutex_expire', 'visibility_timeout', 'unacked_restore_limit', 'fanout_prefix', 'fanout_patterns', 'global_keyprefix', 'socket_timeout', 'socket_connect_timeout', 'socket_keepalive', 'socket_keepalive_options', 'queue_order_strategy', 'max_connections', 'health_check_interval', 'retry_on_timeout', 'priority_steps')
get_table(exchange)[source]

Get table of bindings for exchange.

global_keyprefix = ''

The global key prefix will be prepended to all keys used by Kombu, which can be useful when a redis database is shared by different users. By default, no prefix is prepended.

health_check_interval = 25
keyprefix_fanout = '/{db}.'
keyprefix_queue = '_kombu.binding.%s'
max_connections = 10
property pool
priority(n)[source]
priority_steps = [0, 3, 6, 9]
queue_order_strategy = 'round_robin'

Order in which we consume from queues.

Can be either string alias, or a cycle strategy class

  • round_robin (round_robin_cycle).

    Make sure each queue has an equal opportunity to be consumed from.

  • sorted (sorted_cycle).

    Consume from queues in alphabetical order. If the first queue in the sorted list always contains messages, then the rest of the queues will never be consumed from.

  • priority (priority_cycle).

    Consume from queues in original order, so that if the first queue always contains messages, the rest of the queues in the list will never be consumed from.

The default is to consume from queues in round robin.

retry_on_timeout = None
sep = '\x06\x16'
socket_connect_timeout = None
socket_keepalive = None
socket_keepalive_options = None
socket_timeout = None
property subclient

Pub/Sub connection used to consume fanout queues.

supports_fanout = True

flag set if the channel supports fanout exchanges.

unacked_index_key = 'unacked_index'
unacked_key = 'unacked'
unacked_mutex_expire = 300
unacked_mutex_key = 'unacked_mutex'
unacked_restore_limit = None
visibility_timeout = 3600

SentinelChannel

class kombu.transport.redis.SentinelChannel(*args, **kwargs)[source]

Channel with explicit Redis Sentinel knowledge.

Broker url is supposed to look like:

sentinel://0.0.0.0:26379;sentinel://0.0.0.0:26380/...

where each sentinel is separated by a ;.

Other arguments for the sentinel should come from the transport options (see transport_options of Connection).

You must provide at least one option in Transport options:
  • master_name - name of the redis group to poll

Example:

>>> import kombu
>>> c = kombu.Connection(
     'sentinel://sentinel1:26379;sentinel://sentinel2:26379',
     transport_options={'master_name': 'mymaster'}
)
>>> c.connect()
connection_class

alias of redis.sentinel.SentinelManagedConnection

connection_class_ssl

alias of kombu.transport.redis.SentinelManagedSSLConnection

from_transport_options = ('body_encoding', 'deadletter_queue', 'sep', 'ack_emulation', 'unacked_key', 'unacked_index_key', 'unacked_mutex_key', 'unacked_mutex_expire', 'visibility_timeout', 'unacked_restore_limit', 'fanout_prefix', 'fanout_patterns', 'global_keyprefix', 'socket_timeout', 'socket_connect_timeout', 'socket_keepalive', 'socket_keepalive_options', 'queue_order_strategy', 'max_connections', 'health_check_interval', 'retry_on_timeout', 'priority_steps', 'master_name', 'min_other_sentinels', 'sentinel_kwargs')