BlockingConnection¶
The blocking connection adapter module implements blocking semantics on top of Pika’s core AMQP driver. While most of the asynchronous expectations are removed when using the blocking connection adapter, it attempts to remain true to the asynchronous RPC nature of the AMQP protocol, supporting server sent RPC commands.
The user facing classes in the module consist of the
BlockingConnection
and the BlockingChannel
classes.
Be sure to check out examples in Usage Examples.
- class pika.adapters.blocking_connection.BlockingConnection(parameters=None, _impl_class=None)[source]¶
The BlockingConnection creates a layer on top of Pika’s asynchronous core providing methods that will block until their expected response has returned. Due to the asynchronous nature of the Basic.Deliver and Basic.Return calls from RabbitMQ to your application, you can still implement continuation-passing style asynchronous methods if you’d like to receive messages from RabbitMQ using
basic_consume
or if you want to be notified of a delivery failure when usingbasic_publish
.For more information about communicating with the blocking_connection adapter, be sure to check out the
BlockingChannel
class which implements theChannel
based communication for the blocking_connection adapter.To prevent recursion/reentrancy, the blocking connection and channel implementations queue asynchronously-delivered events received in nested context (e.g., while waiting for BlockingConnection.channel or BlockingChannel.queue_declare to complete), dispatching them synchronously once nesting returns to the desired context. This concerns all callbacks, such as those registered via BlockingConnection.call_later, BlockingConnection.add_on_connection_blocked_callback, BlockingConnection.add_on_connection_unblocked_callback, BlockingChannel.basic_consume, etc.
Blocked Connection deadlock avoidance: when RabbitMQ becomes low on resources, it emits Connection.Blocked (AMQP extension) to the client connection when client makes a resource-consuming request on that connection or its channel (e.g., Basic.Publish); subsequently, RabbitMQ suspsends processing requests from that connection until the affected resources are restored. See http://www.rabbitmq.com/connection-blocked.html. This may impact BlockingConnection and BlockingChannel operations in a way that users might not be expecting. For example, if the user dispatches BlockingChannel.basic_publish in non-publisher-confirmation mode while RabbitMQ is in this low-resource state followed by a synchronous request (e.g., BlockingConnection.channel, BlockingChannel.consume, BlockingChannel.basic_consume, etc.), the synchronous request will block indefinitely (until Connection.Unblocked) waiting for RabbitMQ to reply. If the blocked state persists for a long time, the blocking operation will appear to hang. In this state, BlockingConnection instance and its channels will not dispatch user callbacks. SOLUTION: To break this potential deadlock, applications may configure the blocked_connection_timeout connection parameter when instantiating BlockingConnection. Upon blocked connection timeout, this adapter will raise ConnectionBlockedTimeout exception`. See pika.connection.ConnectionParameters documentation to learn more about the blocked_connection_timeout configuration.
- add_callback_threadsafe(callback)[source]¶
Requests a call to the given function as soon as possible in the context of this connection’s thread.
NOTE: This is the only thread-safe method in BlockingConnection. All other manipulations of BlockingConnection must be performed from the connection’s thread.
NOTE: the callbacks are dispatched only in the scope of specially-designated methods: see BlockingConnection.process_data_events() and BlockingChannel.start_consuming().
For example, a thread may request a call to the BlockingChannel.basic_ack method of a BlockingConnection that is running in a different thread via:
connection.add_callback_threadsafe( functools.partial(channel.basic_ack, delivery_tag=...))
NOTE: if you know that the requester is running on the same thread as the connection it is more efficient to use the BlockingConnection.call_later() method with a delay of 0.
- Parameters
callback (callable) – The callback method; must be callable
- Raises
pika.exceptions.ConnectionWrongStateError – if connection is closed
- add_on_connection_blocked_callback(callback)[source]¶
RabbitMQ AMQP extension - Add a callback to be notified when the connection gets blocked (Connection.Blocked received from RabbitMQ) due to the broker running low on resources (memory or disk). In this state RabbitMQ suspends processing incoming data until the connection is unblocked, so it’s a good idea for publishers receiving this notification to suspend publishing until the connection becomes unblocked.
NOTE: due to the blocking nature of BlockingConnection, if it’s sending outbound data while the connection is/becomes blocked, the call may remain blocked until the connection becomes unblocked, if ever. You may use ConnectionParameters.blocked_connection_timeout to abort a BlockingConnection method call with an exception when the connection remains blocked longer than the given timeout value.
See also Connection.add_on_connection_unblocked_callback()
See also ConnectionParameters.blocked_connection_timeout.
- Parameters
callback (callable) – Callback to call on Connection.Blocked, having the signature callback(connection, pika.frame.Method), where connection is the BlockingConnection instance and the method frame’s method member is of type pika.spec.Connection.Blocked
- add_on_connection_unblocked_callback(callback)[source]¶
RabbitMQ AMQP extension - Add a callback to be notified when the connection gets unblocked (Connection.Unblocked frame is received from RabbitMQ) letting publishers know it’s ok to start publishing again.
- Parameters
callback (callable) –
Callback to call on Connection.Unblocked`, having the signature callback(connection, pika.frame.Method), where connection is the BlockingConnection instance and the method
frame’s method member is of type pika.spec.Connection.Unblocked
- property basic_nack¶
Specifies if the server supports basic.nack on the active connection.
- Return type
- property basic_nack_supported¶
Specifies if the server supports basic.nack on the active connection.
- Return type
- call_later(delay, callback)[source]¶
Create a single-shot timer to fire after delay seconds. Do not confuse with Tornado’s timeout where you pass in the time you want to have your callback called. Only pass in the seconds until it’s to be called.
NOTE: the timer callbacks are dispatched only in the scope of specially-designated methods: see BlockingConnection.process_data_events() and BlockingChannel.start_consuming().
- channel(channel_number=None)[source]¶
Create a new channel with the next available channel number or pass in a channel number to use. Must be non-zero if you would like to specify but it is recommended that you let Pika manage the channel numbers.
- close(reply_code=200, reply_text='Normal shutdown')[source]¶
Disconnect from RabbitMQ. If there are any open channels, it will attempt to close them prior to fully disconnecting. Channels which have active consumers will attempt to send a Basic.Cancel to RabbitMQ to cleanly stop the delivery of messages prior to closing the channel.
- Parameters
- Raises
pika.exceptions.ConnectionWrongStateError – if called on a closed connection (NEW in v1.0.0)
- property consumer_cancel_notify¶
Specifies if the server supports consumer cancel notification on the active connection.
- Return type
- property consumer_cancel_notify_supported¶
Specifies if the server supports consumer cancel notification on the active connection.
- Return type
- property exchange_exchange_bindings¶
Specifies if the active connection supports exchange to exchange bindings.
- Return type
- property exchange_exchange_bindings_supported¶
Specifies if the active connection supports exchange to exchange bindings.
- Return type
- property is_closed¶
Returns a boolean reporting the current connection state.
- property is_open¶
Returns a boolean reporting the current connection state.
- process_data_events(time_limit=0)[source]¶
Will make sure that data events are processed. Dispatches timer and channel callbacks if not called from the scope of BlockingConnection or BlockingChannel callback. Your app can block on this method.
- Parameters
time_limit (float) – suggested upper bound on processing time in seconds. The actual blocking time depends on the granularity of the underlying ioloop. Zero means return as soon as possible. None means there is no limit on processing time and the function will block until I/O produces actionable events. Defaults to 0 for backward compatibility. This parameter is NEW in pika 0.10.0.
- property publisher_confirms¶
Specifies if the active connection can use publisher confirmations.
- Return type
- property publisher_confirms_supported¶
Specifies if the active connection can use publisher confirmations.
- Return type
- remove_timeout(timeout_id)[source]¶
Remove a timer if it’s still in the timeout stack
- Parameters
timeout_id – The opaque timer id to remove
- sleep(duration)[source]¶
A safer way to sleep than calling time.sleep() directly that would keep the adapter from ignoring frames sent from the broker. The connection will “sleep” or block the number of seconds specified in duration in small intervals.
- Parameters
duration (float) – The time to sleep in seconds
- class pika.adapters.blocking_connection.BlockingChannel(channel_impl, connection)[source]¶
The BlockingChannel implements blocking semantics for most things that one would use callback-passing-style for with the
Channel
class. In addition, the BlockingChannel class implements a generator that allows you to consume messages without using callbacks.Example of creating a BlockingChannel:
import pika # Create our connection object connection = pika.BlockingConnection() # The returned object will be a synchronous channel channel = connection.channel()
- add_on_cancel_callback(callback)[source]¶
Pass a callback function that will be called when Basic.Cancel is sent by the broker. The callback function should receive a method frame parameter.
- Parameters
callback (callable) – a callable for handling broker’s Basic.Cancel notification with the call signature: callback(method_frame) where method_frame is of type pika.frame.Method with method of type spec.Basic.Cancel
- add_on_return_callback(callback)[source]¶
Pass a callback function that will be called when a published message is rejected and returned by the server via Basic.Return.
- Parameters
callback (callable) –
The method to call on callback with the signature callback(channel, method, properties, body), where
channel: pika.Channel
method: pika.spec.Basic.Return
properties: pika.spec.BasicProperties
body: bytes
- basic_ack(delivery_tag=0, multiple=False)[source]¶
Acknowledge one or more messages. When sent by the client, this method acknowledges one or more messages delivered via the Deliver or Get-Ok methods. When sent by server, this method acknowledges one or more messages published with the Publish method on a channel in confirm mode. The acknowledgement can be for a single message or a set of messages up to and including a specific message.
- Parameters
delivery-tag (int) – The server-assigned delivery tag
multiple (bool) – If set to True, the delivery tag is treated as “up to and including”, so that multiple messages can be acknowledged with a single method. If set to False, the delivery tag refers to a single message. If the multiple field is 1, and the delivery tag is zero, this indicates acknowledgement of all outstanding messages.
- basic_cancel(consumer_tag)[source]¶
This method cancels a consumer. This does not affect already delivered messages, but it does mean the server will not send any more messages for that consumer. The client may receive an arbitrary number of messages in between sending the cancel method and receiving the cancel-ok reply.
NOTE: When cancelling an auto_ack=False consumer, this implementation automatically Nacks and suppresses any incoming messages that have not yet been dispatched to the consumer’s callback. However, when cancelling a auto_ack=True consumer, this method will return any pending messages that arrived before broker confirmed the cancellation.
- Parameters
consumer_tag (str) – Identifier for the consumer; the result of passing a consumer_tag that was created on another channel is undefined (bad things will happen)
- Returns
(NEW IN pika 0.10.0) empty sequence for a auto_ack=False consumer; for a auto_ack=True consumer, returns a (possibly empty) sequence of pending messages that arrived before broker confirmed the cancellation (this is done instead of via consumer’s callback in order to prevent reentrancy/recursion. Each message is four-tuple: (channel, method, properties, body)
channel: BlockingChannel
method: spec.Basic.Deliver
properties: spec.BasicProperties
body: bytes
- Return type
- basic_consume(queue, on_message_callback, auto_ack=False, exclusive=False, consumer_tag=None, arguments=None)[source]¶
Sends the AMQP command Basic.Consume to the broker and binds messages for the consumer_tag to the consumer callback. If you do not pass in a consumer_tag, one will be automatically generated for you. Returns the consumer tag.
NOTE: the consumer callbacks are dispatched only in the scope of specially-designated methods: see BlockingConnection.process_data_events and BlockingChannel.start_consuming.
For more information about Basic.Consume, see: http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.consume
- Parameters
queue (str) – The queue from which to consume
on_message_callback (callable) –
Required function for dispatching messages to user, having the signature: on_message_callback(channel, method, properties, body)
channel: BlockingChannel
method: spec.Basic.Deliver
properties: spec.BasicProperties
body: bytes
auto_ack (bool) – if set to True, automatic acknowledgement mode will be used (see http://www.rabbitmq.com/confirms.html). This corresponds with the ‘no_ack’ parameter in the basic.consume AMQP 0.9.1 method
exclusive (bool) – Don’t allow other consumers on the queue
consumer_tag (str) – You may specify your own consumer tag; if left empty, a consumer tag will be generated automatically
arguments (dict) – Custom key/value pair arguments for the consumer
- Returns
consumer tag
- Return type
- Raises
pika.exceptions.DuplicateConsumerTag – if consumer with given consumer_tag is already present.
- basic_get(queue, auto_ack=False)[source]¶
Get a single message from the AMQP broker. Returns a sequence with the method frame, message properties, and body.
- Parameters
- Returns
a three-tuple; (None, None, None) if the queue was empty; otherwise (method, properties, body); NOTE: body may be None
- Return type
(spec.Basic.GetOk|None, spec.BasicProperties|None, str|None)
- basic_nack(delivery_tag=0, multiple=False, requeue=True)[source]¶
This method allows a client to reject one or more incoming messages. It can be used to interrupt and cancel large incoming messages, or return untreatable messages to their original queue.
- Parameters
delivery-tag (int) – The server-assigned delivery tag
multiple (bool) – If set to True, the delivery tag is treated as “up to and including”, so that multiple messages can be acknowledged with a single method. If set to False, the delivery tag refers to a single message. If the multiple field is 1, and the delivery tag is zero, this indicates acknowledgement of all outstanding messages.
requeue (bool) – If requeue is true, the server will attempt to requeue the message. If requeue is false or the requeue attempt fails the messages are discarded or dead-lettered.
- basic_publish(exchange, routing_key, body, properties=None, mandatory=False)[source]¶
Publish to the channel with the given exchange, routing key, and body.
For more information on basic_publish and what the parameters do, see:
- NOTE: mandatory may be enabled even without delivery
confirmation, but in the absence of delivery confirmation the synchronous implementation has no way to know how long to wait for the Basic.Return.
- Parameters
exchange (str) – The exchange to publish to
routing_key (str) – The routing key to bind on
body (bytes) – The message body; empty string if no body
properties (pika.spec.BasicProperties) – message properties
mandatory (bool) – The mandatory flag
- Raises
UnroutableError – raised when a message published in publisher-acknowledgments mode (see BlockingChannel.confirm_delivery) is returned via Basic.Return followed by Basic.Ack.
NackError – raised when a message published in publisher-acknowledgements mode is Nack’ed by the broker. See BlockingChannel.confirm_delivery.
- basic_qos(prefetch_size=0, prefetch_count=0, global_qos=False)[source]¶
Specify quality of service. This method requests a specific quality of service. The QoS can be specified for the current channel or for all channels on the connection. The client can request that messages be sent in advance so that when the client finishes processing a message, the following message is already held locally, rather than needing to be sent down the channel. Prefetching gives a performance improvement.
- Parameters
prefetch_size (int) – This field specifies the prefetch window size. The server will send a message in advance if it is equal to or smaller in size than the available prefetch size (and also falls into other prefetch limits). May be set to zero, meaning “no specific limit”, although other prefetch limits may still apply. The prefetch-size is ignored if the no-ack option is set in the consumer.
prefetch_count (int) – Specifies a prefetch window in terms of whole messages. This field may be used in combination with the prefetch-size field; a message will only be sent in advance if both prefetch windows (and those at the channel and connection level) allow it. The prefetch-count is ignored if the no-ack option is set in the consumer.
global_qos (bool) – Should the QoS apply to all channels on the connection.
- basic_recover(requeue=False)[source]¶
This method asks the server to redeliver all unacknowledged messages on a specified channel. Zero or more messages may be redelivered. This method replaces the asynchronous Recover.
- Parameters
requeue (bool) – If False, the message will be redelivered to the original recipient. If True, the server will attempt to requeue the message, potentially then delivering it to an alternative subscriber.
- basic_reject(delivery_tag=0, requeue=True)[source]¶
Reject an incoming message. This method allows a client to reject a message. It can be used to interrupt and cancel large incoming messages, or return untreatable messages to their original queue.
- cancel()[source]¶
Cancel the queue consumer created by BlockingChannel.consume, rejecting all pending ackable messages.
NOTE: If you’re looking to cancel a consumer issued with BlockingChannel.basic_consume then you should call BlockingChannel.basic_cancel.
- Returns
The number of messages requeued by Basic.Nack. NEW in 0.10.0: returns 0
- Return type
- property channel_number¶
Channel number
- close(reply_code=0, reply_text='Normal shutdown')[source]¶
Will invoke a clean shutdown of the channel with the AMQP Broker.
- confirm_delivery()[source]¶
Turn on RabbitMQ-proprietary Confirm mode in the channel.
- For more information see:
- property connection¶
The channel’s BlockingConnection instance
- consume(queue, auto_ack=False, exclusive=False, arguments=None, inactivity_timeout=None)[source]¶
Blocking consumption of a queue instead of via a callback. This method is a generator that yields each message as a tuple of method, properties, and body. The active generator iterator terminates when the consumer is cancelled by client via BlockingChannel.cancel() or by broker.
Example:
for method, properties, body in channel.consume('queue'): print body channel.basic_ack(method.delivery_tag)
You should call BlockingChannel.cancel() when you escape out of the generator loop.
If you don’t cancel this consumer, then next call on the same channel to consume() with the exact same (queue, auto_ack, exclusive) parameters will resume the existing consumer generator; however, calling with different parameters will result in an exception.
- Parameters
queue (str) – The queue name to consume
auto_ack (bool) – Tell the broker to not expect a ack/nack response
exclusive (bool) – Don’t allow other consumers on the queue
arguments (dict) – Custom key/value pair arguments for the consumer
inactivity_timeout (float) – if a number is given (in seconds), will cause the method to yield (None, None, None) after the given period of inactivity; this permits for pseudo-regular maintenance activities to be carried out by the user while waiting for messages to arrive. If None is given (default), then the method blocks until the next event arrives. NOTE that timing granularity is limited by the timer resolution of the underlying implementation. NEW in pika 0.10.0.
- Yields
tuple(spec.Basic.Deliver, spec.BasicProperties, str or unicode)
- Raises
ValueError – if consumer-creation parameters don’t match those of the existing queue consumer generator, if any. NEW in pika 0.10.0
ChannelClosed – when this channel is closed by broker.
- property consumer_tags¶
Property method that returns a list of consumer tags for active consumers
- Return type
- exchange_bind(destination, source, routing_key='', arguments=None)[source]¶
Bind an exchange to another exchange.
- Parameters
- Returns
Method frame from the Exchange.Bind-ok response
- Return type
pika.frame.Method having method attribute of type spec.Exchange.BindOk
- exchange_declare(exchange, exchange_type=ExchangeType.direct, passive=False, durable=False, auto_delete=False, internal=False, arguments=None)[source]¶
This method creates an exchange if it does not already exist, and if the exchange exists, verifies that it is of the correct and expected class.
If passive set, the server will reply with Declare-Ok if the exchange already exists with the same name, and raise an error if not and if the exchange does not already exist, the server MUST raise a channel exception with reply code 404 (not found).
- Parameters
exchange (str) – The exchange name consists of a non-empty sequence of these characters: letters, digits, hyphen, underscore, period, or colon.
exchange_type (str) – The exchange type to use
passive (bool) – Perform a declare or just check to see if it exists
durable (bool) – Survive a reboot of RabbitMQ
auto_delete (bool) – Remove when no more queues are bound to it
internal (bool) – Can only be published to by other exchanges
arguments (dict) – Custom key/value pair arguments for the exchange
- Returns
Method frame from the Exchange.Declare-ok response
- Return type
pika.frame.Method having method attribute of type spec.Exchange.DeclareOk
- exchange_unbind(destination=None, source=None, routing_key='', arguments=None)[source]¶
Unbind an exchange from another exchange.
- Parameters
- Returns
Method frame from the Exchange.Unbind-ok response
- Return type
pika.frame.Method having method attribute of type spec.Exchange.UnbindOk
- flow(active)[source]¶
Turn Channel flow control off and on.
NOTE: RabbitMQ doesn’t support active=False; per https://www.rabbitmq.com/specification.html: “active=false is not supported by the server. Limiting prefetch with basic.qos provides much better control”
For more information, please reference:
http://www.rabbitmq.com/amqp-0-9-1-reference.html#channel.flow
- get_waiting_message_count()[source]¶
Returns the number of messages that may be retrieved from the current queue consumer generator via BlockingChannel.consume without blocking. NEW in pika 0.10.0
- Returns
The number of waiting messages
- Return type
- queue_bind(queue, exchange, routing_key=None, arguments=None)[source]¶
Bind the queue to the specified exchange
- Parameters
- Returns
Method frame from the Queue.Bind-ok response
- Return type
pika.frame.Method having method attribute of type spec.Queue.BindOk
- queue_declare(queue, passive=False, durable=False, exclusive=False, auto_delete=False, arguments=None)[source]¶
Declare queue, create if needed. This method creates or checks a queue. When creating a new queue the client can specify various properties that control the durability of the queue and its contents, and the level of sharing for the queue.
Use an empty string as the queue name for the broker to auto-generate one. Retrieve this auto-generated queue name from the returned spec.Queue.DeclareOk method frame.
- Parameters
queue (str) – The queue name; if empty string, the broker will create a unique queue name
passive (bool) – Only check to see if the queue exists and raise ChannelClosed if it doesn’t
durable (bool) – Survive reboots of the broker
exclusive (bool) – Only allow access by the current connection
auto_delete (bool) – Delete after consumer cancels or disconnects
arguments (dict) – Custom key/value arguments for the queue
- Returns
Method frame from the Queue.Declare-ok response
- Return type
pika.frame.Method having method attribute of type spec.Queue.DeclareOk
- queue_purge(queue)[source]¶
Purge all of the messages from the specified queue
- Parameters
queue (str) – The queue to purge
- Returns
Method frame from the Queue.Purge-ok response
- Return type
pika.frame.Method having method attribute of type spec.Queue.PurgeOk
- queue_unbind(queue, exchange=None, routing_key=None, arguments=None)[source]¶
Unbind a queue from an exchange.
- Parameters
- Returns
Method frame from the Queue.Unbind-ok response
- Return type
pika.frame.Method having method attribute of type spec.Queue.UnbindOk
- start_consuming()[source]¶
Processes I/O events and dispatches timers and basic_consume callbacks until all consumers are cancelled.
NOTE: this blocking function may not be called from the scope of a pika callback, because dispatching basic_consume callbacks from this context would constitute recursion.
- Raises
pika.exceptions.ReentrancyError – if called from the scope of a BlockingConnection or BlockingChannel callback
ChannelClosed – when this channel is closed by broker.
- stop_consuming(consumer_tag=None)[source]¶
Cancels all consumers, signalling the start_consuming loop to exit.
NOTE: pending non-ackable messages will be lost; pending ackable messages will be rejected.
- tx_commit()[source]¶
Commit a transaction.
- Returns
Method frame from the Tx.Commit-ok response
- Return type
pika.frame.Method having method attribute of type spec.Tx.CommitOk
- tx_rollback()[source]¶
Rollback a transaction.
- Returns
Method frame from the Tx.Commit-ok response
- Return type
pika.frame.Method having method attribute of type spec.Tx.CommitOk
- tx_select()[source]¶
Select standard transaction mode. This method sets the channel to use standard transactions. The client must use this method at least once on a channel before using the Commit or Rollback methods.
- Returns
Method frame from the Tx.Select-ok response
- Return type
pika.frame.Method having method attribute of type spec.Tx.SelectOk