This document is for Kombu's development version, which can be significantly different from previous releases. Get the stable docs here: 5.0.

MongoDB Transport - kombu.transport.mongodb

MongoDB transport module for kombu.

Features

  • Type: Virtual

  • Supports Direct: Yes

  • Supports Topic: Yes

  • Supports Fanout: Yes

  • Supports Priority: Yes

  • Supports TTL: Yes

Connection String

Unreviewed

Transport Options

  • connect_timeout,

  • ssl,

  • ttl,

  • capped_queue_size,

  • default_hostname,

  • default_port,

  • default_database,

  • messages_collection,

  • routing_collection,

  • broadcast_collection,

  • queues_collection,

  • calc_queue_size,

Transport

class kombu.transport.mongodb.Transport(client, **kwargs)[source]

MongoDB Transport.

class Channel(*vargs, **kwargs)

MongoDB Channel.

property broadcast
broadcast_collection = 'messages.broadcast'
calc_queue_size = True
capped_queue_size = 100000
property client
connect_timeout = None
default_database = 'kombu_default'
default_hostname = '127.0.0.1'
default_port = 27017
from_transport_options = ('body_encoding', 'deadletter_queue', 'connect_timeout', 'ssl', 'ttl', 'capped_queue_size', 'default_hostname', 'default_port', 'default_database', 'messages_collection', 'routing_collection', 'broadcast_collection', 'queues_collection', 'calc_queue_size')
get_now()

Return current time in UTC.

get_table(exchange)

Get table of bindings for exchange.

property messages
messages_collection = 'messages'
prepare_queue_arguments(arguments, **kwargs)
queue_delete(queue, **kwargs)

Delete queue.

property queues
queues_collection = 'messages.queues'
property routing
routing_collection = 'messages.routing'
ssl = False
supports_fanout = True

flag set if the channel supports fanout exchanges.

ttl = False
can_parse_url = True

Set to True if Connection should pass the URL unmodified.

channel_errors = (<class 'amqp.exceptions.ChannelError'>, <class 'pymongo.errors.ConnectionFailure'>, <class 'pymongo.errors.OperationFailure'>)

Tuple of errors that can happen due to channel/method failure.

connection_errors = (<class 'amqp.exceptions.ConnectionError'>, <class 'pymongo.errors.ConnectionFailure'>)

Tuple of errors that can happen due to connection failure.

default_port = 27017

port number used when no port is specified.

driver_name = 'pymongo'

Name of driver library (e.g. ‘py-amqp’, ‘redis’).

driver_type = 'mongodb'

Type of driver, can be used to separate transports using the AMQP protocol (driver_type: ‘amqp’), Redis (driver_type: ‘redis’), etc…

driver_version()[source]
implements = {'asynchronous': False, 'exchange_type': frozenset({'direct', 'fanout', 'topic'}), 'heartbeats': False}
polling_interval = 1

Time to sleep between unsuccessful polls.

Channel

class kombu.transport.mongodb.Channel(*vargs, **kwargs)[source]

MongoDB Channel.

property broadcast
broadcast_collection = 'messages.broadcast'
calc_queue_size = True
capped_queue_size = 100000
property client
connect_timeout = None
default_database = 'kombu_default'
default_hostname = '127.0.0.1'
default_port = 27017
from_transport_options = ('body_encoding', 'deadletter_queue', 'connect_timeout', 'ssl', 'ttl', 'capped_queue_size', 'default_hostname', 'default_port', 'default_database', 'messages_collection', 'routing_collection', 'broadcast_collection', 'queues_collection', 'calc_queue_size')
get_now()[source]

Return current time in UTC.

get_table(exchange)[source]

Get table of bindings for exchange.

property messages
messages_collection = 'messages'
prepare_queue_arguments(arguments, **kwargs)[source]
queue_delete(queue, **kwargs)[source]

Delete queue.

property queues
queues_collection = 'messages.queues'
property routing
routing_collection = 'messages.routing'
ssl = False
supports_fanout = True

flag set if the channel supports fanout exchanges.

ttl = False