Twisted Connection Adapter

Using Pika with a Twisted reactor.

The interfaces in this module are Deferred-based when possible. This means that the connection.channel() method and most of the channel methods return Deferreds instead of taking a callback argument and that basic_consume() returns a Twisted DeferredQueue where messages from the server will be stored. Refer to the docstrings for TwistedProtocolConnection.channel() and the TwistedChannel class for details.

class pika.adapters.twisted_connection.TwistedProtocolConnection(parameters=None, custom_reactor=None)[source]

A Pika-specific implementation of a Twisted Protocol. Allows using Twisted’s non-blocking connectTCP/connectSSL methods for connecting to the server.

TwistedProtocolConnection objects have a ready instance variable that’s a Deferred which fires when the connection is ready to be used (the initial AMQP handshaking has been done). You have to wait for this Deferred to fire before requesting a channel.

Once the connection is ready, you will be able to use the closed instance variable: a Deferred which fires when the connection is closed.

Since it’s Twisted handling connection establishing it does not accept connect callbacks, you have to implement that within Twisted. Also remember that the host, port and ssl values of the connection parameters are ignored because, yet again, it’s Twisted who manages the connection.

channel(channel_number=None)[source]

Create a new channel with the next available channel number or pass in a channel number to use. Must be non-zero if you would like to specify but it is recommended that you let Pika manage the channel numbers.

Parameters

channel_number (int) – The channel number to use, defaults to the next available.

Returns

a Deferred that fires with an instance of a wrapper around the Pika Channel class.

Return type

Deferred

connectionLost(reason=<twisted.python.failure.Failure twisted.internet.error.ConnectionDone: Connection was closed cleanly.>)[source]

Called when the connection is shut down.

Clear any circular references here, and any external references to this Protocol. The connection has been closed.

@type reason: L{twisted.python.failure.Failure}

connectionMade()

Called when a connection is made.

This may be considered the initializer of the protocol, because it is called when the connection is completed. For clients, this is called once the connection to the server has been established; for servers, this is called after an accept() call stops blocking and a socket has been received. If you need to send any greeting or initial message, do it here.

connectionReady()[source]

This method will be called when the underlying connection is ready.

dataReceived(data)[source]

Called whenever data is received.

Use this method to translate to a higher-level message. Usually, some callback will be made upon the receipt of each complete protocol message.

@param data: a string of indeterminate length. Please keep in mind

that you will probably need to buffer some data, as partial (or multiple) protocol messages may be received! I recommend that unit tests for protocols call through to this method with differing chunk sizes, down to one byte at a time.

logPrefix()

Return a prefix matching the class name, to identify log messages related to this protocol instance.

makeConnection(transport)[source]

Make a connection to a transport and a server.

This sets the ‘transport’ attribute of this Protocol, and calls the connectionMade() callback.

class pika.adapters.twisted_connection.TwistedChannel(channel)[source]

A wrapper around Pika’s Channel.

Channel methods that normally take a callback argument are wrapped to return a Deferred that fires with whatever would be passed to the callback. If the channel gets closed, all pending Deferreds are errbacked with a ChannelClosed exception. The returned Deferreds fire with whatever arguments the callback to the original method would receive.

Some methods like basic_consume and basic_get are wrapped in a special way, see their docstrings for details.

add_on_return_callback(callback)[source]

Pass a callback function that will be called when a published message is rejected and returned by the server via Basic.Return.

Parameters

callback (callable) – The method to call on callback with the message as only argument. The message is a named tuple with the following attributes: channel: this TwistedChannel method: pika.spec.Basic.Return properties: pika.spec.BasicProperties body: bytes

basic_ack(delivery_tag=0, multiple=False)[source]

Acknowledge one or more messages. When sent by the client, this method acknowledges one or more messages delivered via the Deliver or Get-Ok methods. When sent by server, this method acknowledges one or more messages published with the Publish method on a channel in confirm mode. The acknowledgement can be for a single message or a set of messages up to and including a specific message.

Parameters
  • delivery_tag (integer) – int/long The server-assigned delivery tag

  • multiple (bool) – If set to True, the delivery tag is treated as “up to and including”, so that multiple messages can be acknowledged with a single method. If set to False, the delivery tag refers to a single message. If the multiple field is 1, and the delivery tag is zero, this indicates acknowledgement of all outstanding messages.

basic_cancel(consumer_tag='')[source]

This method cancels a consumer. This does not affect already delivered messages, but it does mean the server will not send any more messages for that consumer. The client may receive an arbitrary number of messages in between sending the cancel method and receiving the cancel-ok reply. It may also be sent from the server to the client in the event of the consumer being unexpectedly cancelled (i.e. cancelled for any reason other than the server receiving the corresponding basic.cancel from the client). This allows clients to be notified of the loss of consumers due to events such as queue deletion.

This method wraps Channel.basic_cancel and closes any deferred queue associated with that consumer.

Parameters

consumer_tag (str) – Identifier for the consumer

Returns

Deferred that fires on the Basic.CancelOk response

Return type

Deferred

Raises

ValueError

basic_consume(queue, auto_ack=False, exclusive=False, consumer_tag=None, arguments=None)[source]

Consume from a server queue.

Sends the AMQP 0-9-1 command Basic.Consume to the broker and binds messages for the consumer_tag to a ClosableDeferredQueue. If you do not pass in a consumer_tag, one will be automatically generated for you.

For more information on basic_consume, see: Tutorial 2 at http://www.rabbitmq.com/getstarted.html http://www.rabbitmq.com/confirms.html http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.consume

Parameters
  • queue (str) – The queue to consume from. Use the empty string to specify the most recent server-named queue for this channel.

  • auto_ack (bool) – if set to True, automatic acknowledgement mode will be used (see http://www.rabbitmq.com/confirms.html). This corresponds with the ‘no_ack’ parameter in the basic.consume AMQP 0.9.1 method

  • exclusive (bool) – Don’t allow other consumers on the queue

  • consumer_tag (str) – Specify your own consumer tag

  • arguments (dict) – Custom key/value pair arguments for the consumer

Returns

Deferred that fires with a tuple (queue_object, consumer_tag). The Deferred will errback with an instance of exceptions.ChannelClosed if the call fails. The queue object is an instance of ClosableDeferredQueue, where data received from the queue will be stored. Clients should use its get() method to fetch an individual message, which will return a Deferred firing with a namedtuple whose attributes are:

  • channel: this TwistedChannel

  • method: pika.spec.Basic.Deliver

  • properties: pika.spec.BasicProperties

  • body: bytes

Return type

Deferred

basic_get(queue, auto_ack=False)[source]

Get a single message from the AMQP broker.

Will return If the queue is empty, it will return None. If you want to be notified of Basic.GetEmpty, use the Channel.add_callback method adding your Basic.GetEmpty callback which should expect only one parameter, frame. Due to implementation details, this cannot be called a second time until the callback is executed. For more information on basic_get and its parameters, see:

http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.get

This method wraps Channel.basic_get.

Parameters
  • queue (str) – The queue from which to get a message. Use the empty string to specify the most recent server-named queue for this channel.

  • auto_ack (bool) – Tell the broker to not expect a reply

Returns

Deferred that fires with a namedtuple whose attributes are:
  • channel: this TwistedChannel

  • method: pika.spec.Basic.GetOk

  • properties: pika.spec.BasicProperties

  • body: bytes

If the queue is empty, None will be returned.

Return type

Deferred

Raises

pika.exceptions.DuplicateGetOkCallback

basic_nack(delivery_tag=None, multiple=False, requeue=True)[source]

This method allows a client to reject one or more incoming messages. It can be used to interrupt and cancel large incoming messages, or return untreatable messages to their original queue.

Parameters
  • delivery-tag (integer) – int/long The server-assigned delivery tag

  • multiple (bool) – If set to True, the delivery tag is treated as “up to and including”, so that multiple messages can be acknowledged with a single method. If set to False, the delivery tag refers to a single message. If the multiple field is 1, and the delivery tag is zero, this indicates acknowledgement of all outstanding messages.

  • requeue (bool) – If requeue is true, the server will attempt to requeue the message. If requeue is false or the requeue attempt fails the messages are discarded or dead-lettered.

basic_publish(exchange, routing_key, body, properties=None, mandatory=False)[source]

Publish to the channel with the given exchange, routing key and body.

This method wraps Channel.basic_publish, but makes sure the channel is not closed before publishing.

For more information on basic_publish and what the parameters do, see:

http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.publish

Parameters
  • exchange (str) – The exchange to publish to

  • routing_key (str) – The routing key to bind on

  • body (bytes) – The message body

  • properties (pika.spec.BasicProperties) – Basic.properties

  • mandatory (bool) – The mandatory flag

Returns

A Deferred that fires with the result of the channel’s basic_publish.

Return type

Deferred

Raises
  • UnroutableError – raised when a message published in publisher-acknowledgments mode (see BlockingChannel.confirm_delivery) is returned via Basic.Return followed by Basic.Ack.

  • NackError – raised when a message published in publisher-acknowledgements mode is Nack’ed by the broker. See BlockingChannel.confirm_delivery.

basic_qos(prefetch_size=0, prefetch_count=0, global_qos=False)[source]

Specify quality of service. This method requests a specific quality of service. The QoS can be specified for the current channel or for all channels on the connection. The client can request that messages be sent in advance so that when the client finishes processing a message, the following message is already held locally, rather than needing to be sent down the channel. Prefetching gives a performance improvement.

Parameters
  • prefetch_size (int) – This field specifies the prefetch window size. The server will send a message in advance if it is equal to or smaller in size than the available prefetch size (and also falls into other prefetch limits). May be set to zero, meaning “no specific limit”, although other prefetch limits may still apply. The prefetch-size is ignored by consumers who have enabled the no-ack option.

  • prefetch_count (int) – Specifies a prefetch window in terms of whole messages. This field may be used in combination with the prefetch-size field; a message will only be sent in advance if both prefetch windows (and those at the channel and connection level) allow it. The prefetch-count is ignored by consumers who have enabled the no-ack option.

  • global_qos (bool) – Should the QoS apply to all channels on the connection.

Returns

Deferred that fires on the Basic.QosOk response

Return type

Deferred

basic_recover(requeue=False)[source]

This method asks the server to redeliver all unacknowledged messages on a specified channel. Zero or more messages may be redelivered. This method replaces the asynchronous Recover.

Parameters

requeue (bool) – If False, the message will be redelivered to the original recipient. If True, the server will attempt to requeue the message, potentially then delivering it to an alternative subscriber.

Returns

Deferred that fires on the Basic.RecoverOk response

Return type

Deferred

basic_reject(delivery_tag, requeue=True)[source]

Reject an incoming message. This method allows a client to reject a message. It can be used to interrupt and cancel large incoming messages, or return untreatable messages to their original queue.

Parameters
  • delivery_tag (integer) – int/long The server-assigned delivery tag

  • requeue (bool) – If requeue is true, the server will attempt to requeue the message. If requeue is false or the requeue attempt fails the messages are discarded or dead-lettered.

Raises

TypeError

callback_deferred(deferred, replies)[source]

Pass in a Deferred and a list replies from the RabbitMQ broker which you’d like the Deferred to be callbacked on with the frame as callback value.

Parameters
  • deferred (Deferred) – The Deferred to callback

  • replies (list) – The replies to callback on

close(reply_code=0, reply_text='Normal shutdown')[source]

Invoke a graceful shutdown of the channel with the AMQP Broker.

If channel is OPENING, transition to CLOSING and suppress the incoming Channel.OpenOk, if any.

Parameters
  • reply_code (int) – The reason code to send to broker

  • reply_text (str) – The reason text to send to broker

Raises

ChannelWrongStateError – if channel is closed or closing

confirm_delivery()[source]

Turn on Confirm mode in the channel. Pass in a callback to be notified by the Broker when a message has been confirmed as received or rejected (Basic.Ack, Basic.Nack) from the broker to the publisher.

For more information see:

http://www.rabbitmq.com/confirms.html#publisher-confirms

Returns

Deferred that fires on the Confirm.SelectOk response

Return type

Deferred

exchange_bind(destination, source, routing_key='', arguments=None)[source]

Bind an exchange to another exchange.

Parameters
  • destination (str) – The destination exchange to bind

  • source (str) – The source exchange to bind to

  • routing_key (str) – The routing key to bind on

  • arguments (dict) – Custom key/value pair arguments for the binding

Raises

ValueError

Returns

Deferred that fires on the Exchange.BindOk response

Return type

Deferred

exchange_declare(exchange, exchange_type=ExchangeType.direct, passive=False, durable=False, auto_delete=False, internal=False, arguments=None)[source]

This method creates an exchange if it does not already exist, and if the exchange exists, verifies that it is of the correct and expected class.

If passive set, the server will reply with Declare-Ok if the exchange already exists with the same name, and raise an error if not and if the exchange does not already exist, the server MUST raise a channel exception with reply code 404 (not found).

Parameters
  • exchange (str) – The exchange name consists of a non-empty sequence of these characters: letters, digits, hyphen, underscore, period, or colon

  • exchange_type (str) – The exchange type to use

  • passive (bool) – Perform a declare or just check to see if it exists

  • durable (bool) – Survive a reboot of RabbitMQ

  • auto_delete (bool) – Remove when no more queues are bound to it

  • internal (bool) – Can only be published to by other exchanges

  • arguments (dict) – Custom key/value pair arguments for the exchange

Returns

Deferred that fires on the Exchange.DeclareOk response

Return type

Deferred

Raises

ValueError

exchange_delete(exchange=None, if_unused=False)[source]

Delete the exchange.

Parameters
  • exchange (str) – The exchange name

  • if_unused (bool) – only delete if the exchange is unused

Returns

Deferred that fires on the Exchange.DeleteOk response

Return type

Deferred

Raises

ValueError

exchange_unbind(destination=None, source=None, routing_key='', arguments=None)[source]

Unbind an exchange from another exchange.

Parameters
  • destination (str) – The destination exchange to unbind

  • source (str) – The source exchange to unbind from

  • routing_key (str) – The routing key to unbind

  • arguments (dict) – Custom key/value pair arguments for the binding

Returns

Deferred that fires on the Exchange.UnbindOk response

Return type

Deferred

Raises

ValueError

flow(active)[source]

Turn Channel flow control off and on.

Returns a Deferred that will fire with a bool indicating the channel flow state. For more information, please reference:

http://www.rabbitmq.com/amqp-0-9-1-reference.html#channel.flow

Parameters

active (bool) – Turn flow on or off

Returns

Deferred that fires with the channel flow state

Return type

Deferred

Raises

ValueError

property is_closed

Returns True if the channel is closed.

Return type

bool

property is_closing

Returns True if client-initiated closing of the channel is in progress.

Return type

bool

property is_open

Returns True if the channel is open.

Return type

bool

open()[source]

Open the channel

queue_bind(queue, exchange, routing_key=None, arguments=None)[source]

Bind the queue to the specified exchange

Parameters
  • queue (str) – The queue to bind to the exchange

  • exchange (str) – The source exchange to bind to

  • routing_key (str) – The routing key to bind on

  • arguments (dict) – Custom key/value pair arguments for the binding

Returns

Deferred that fires on the Queue.BindOk response

Return type

Deferred

Raises

ValueError

queue_declare(queue, passive=False, durable=False, exclusive=False, auto_delete=False, arguments=None)[source]

Declare queue, create if needed. This method creates or checks a queue. When creating a new queue the client can specify various properties that control the durability of the queue and its contents, and the level of sharing for the queue.

Use an empty string as the queue name for the broker to auto-generate one

Parameters
  • queue (str) – The queue name; if empty string, the broker will create a unique queue name

  • passive (bool) – Only check to see if the queue exists

  • durable (bool) – Survive reboots of the broker

  • exclusive (bool) – Only allow access by the current connection

  • auto_delete (bool) – Delete after consumer cancels or disconnects

  • arguments (dict) – Custom key/value arguments for the queue

Returns

Deferred that fires on the Queue.DeclareOk response

Return type

Deferred

Raises

ValueError

queue_delete(queue, if_unused=False, if_empty=False)[source]

Delete a queue from the broker.

This method wraps Channel.queue_delete, and removes the reference to the queue object after it gets deleted on the server.

Parameters
  • queue (str) – The queue to delete

  • if_unused (bool) – only delete if it’s unused

  • if_empty (bool) – only delete if the queue is empty

Returns

Deferred that fires on the Queue.DeleteOk response

Return type

Deferred

Raises

ValueError

queue_purge(queue)[source]

Purge all of the messages from the specified queue

Parameters

queue (str) – The queue to purge

Returns

Deferred that fires on the Queue.PurgeOk response

Return type

Deferred

Raises

ValueError

queue_unbind(queue, exchange=None, routing_key=None, arguments=None)[source]

Unbind a queue from an exchange.

Parameters
  • queue (str) – The queue to unbind from the exchange

  • exchange (str) – The source exchange to bind from

  • routing_key (str) – The routing key to unbind

  • arguments (dict) – Custom key/value pair arguments for the binding

Returns

Deferred that fires on the Queue.UnbindOk response

Return type

Deferred

Raises

ValueError

tx_commit()[source]

Commit a transaction.

Returns

Deferred that fires on the Tx.CommitOk response

Return type

Deferred

Raises

ValueError

tx_rollback()[source]

Rollback a transaction.

Returns

Deferred that fires on the Tx.RollbackOk response

Return type

Deferred

Raises

ValueError

tx_select()[source]

Select standard transaction mode. This method sets the channel to use standard transactions. The client must use this method at least once on a channel before using the Commit or Rollback methods.

Returns

Deferred that fires on the Tx.SelectOk response

Return type

Deferred

Raises

ValueError

class pika.adapters.twisted_connection.ClosableDeferredQueue(size=None, backlog=None)[source]

Like the normal Twisted DeferredQueue, but after close() is called with an exception instance all pending Deferreds are errbacked and further attempts to call get() or put() return a Failure wrapping that exception.

close(reason)[source]

Closes the queue.

Errback the pending calls to get().

get()[source]

Returns a Deferred that will fire with the next item in the queue, when it’s available.

The Deferred will errback if the queue is closed.

Returns

Deferred that fires with the next item.

Return type

Deferred

put(obj)[source]

Like the original DeferredQueue.put() method, but returns an errback if the queue is closed.