Source code for pika.adapters.tornado_connection
"""Use pika with the Tornado IOLoop
"""
import logging
from tornado import ioloop
from pika.adapters.utils import nbio_interface, selector_ioloop_adapter
from pika.adapters import base_connection
LOGGER = logging.getLogger(__name__)
[docs]class TornadoConnection(base_connection.BaseConnection):
"""The TornadoConnection runs on the Tornado IOLoop.
"""
def __init__(self,
parameters=None,
on_open_callback=None,
on_open_error_callback=None,
on_close_callback=None,
custom_ioloop=None,
internal_connection_workflow=True):
"""Create a new instance of the TornadoConnection class, connecting
to RabbitMQ automatically.
:param pika.connection.Parameters|None parameters: The connection
parameters
:param callable|None on_open_callback: The method to call when the
connection is open
:param callable|None on_open_error_callback: Called if the connection
can't be established or connection establishment is interrupted by
`Connection.close()`:
on_open_error_callback(Connection, exception)
:param callable|None on_close_callback: Called when a previously fully
open connection is closed:
`on_close_callback(Connection, exception)`, where `exception` is
either an instance of `exceptions.ConnectionClosed` if closed by
user or broker or exception of another type that describes the
cause of connection failure
:param ioloop.IOLoop|nbio_interface.AbstractIOServices|None custom_ioloop:
Override using the global IOLoop in Tornado
:param bool internal_connection_workflow: True for autonomous connection
establishment which is default; False for externally-managed
connection workflow via the `create_connection()` factory
"""
if isinstance(custom_ioloop, nbio_interface.AbstractIOServices):
nbio = custom_ioloop
else:
nbio = (selector_ioloop_adapter.SelectorIOServicesAdapter(
custom_ioloop or ioloop.IOLoop.instance()))
super(TornadoConnection, self).__init__(
parameters,
on_open_callback,
on_open_error_callback,
on_close_callback,
nbio,
internal_connection_workflow=internal_connection_workflow)
[docs] @classmethod
def create_connection(cls,
connection_configs,
on_done,
custom_ioloop=None,
workflow=None):
"""Implement
:py:classmethod:`pika.adapters.BaseConnection.create_connection()`.
"""
nbio = selector_ioloop_adapter.SelectorIOServicesAdapter(
custom_ioloop or ioloop.IOLoop.instance())
def connection_factory(params):
"""Connection factory."""
if params is None:
raise ValueError('Expected pika.connection.Parameters '
'instance, but got None in params arg.')
return cls(
parameters=params,
custom_ioloop=nbio,
internal_connection_workflow=False)
return cls._start_connection_workflow(
connection_configs=connection_configs,
connection_factory=connection_factory,
nbio=nbio,
workflow=workflow,
on_done=on_done)