"""Use pika with the Gevent IOLoop."""
import functools
import logging
import os
import threading
import weakref
try:
import queue
except ImportError: # Python <= v2.7
import Queue as queue
import gevent
import gevent.hub
import gevent.socket
import pika.compat
from pika.adapters.base_connection import BaseConnection
from pika.adapters.utils.io_services_utils import check_callback_arg
from pika.adapters.utils.nbio_interface import (
AbstractIOReference,
AbstractIOServices,
)
from pika.adapters.utils.selector_ioloop_adapter import (
AbstractSelectorIOLoop,
SelectorIOServicesAdapter,
)
LOGGER = logging.getLogger(__name__)
[docs]
class GeventConnection(BaseConnection):
"""Implementation of pika's ``BaseConnection``.
An async selector-based connection which integrates with Gevent.
"""
def __init__(self,
parameters=None,
on_open_callback=None,
on_open_error_callback=None,
on_close_callback=None,
custom_ioloop=None,
internal_connection_workflow=True):
"""Create a new GeventConnection instance and connect to RabbitMQ on
Gevent's event-loop.
:param pika.connection.Parameters|None parameters: The connection
parameters
:param callable|None on_open_callback: The method to call when the
connection is open
:param callable|None on_open_error_callback: Called if the connection
can't be established or connection establishment is interrupted by
`Connection.close()`:
on_open_error_callback(Connection, exception)
:param callable|None on_close_callback: Called when a previously fully
open connection is closed:
`on_close_callback(Connection, exception)`, where `exception` is
either an instance of `exceptions.ConnectionClosed` if closed by
user or broker or exception of another type that describes the
cause of connection failure
:param gevent._interfaces.ILoop|nbio_interface.AbstractIOServices|None
custom_ioloop: Use a custom Gevent ILoop.
:param bool internal_connection_workflow: True for autonomous connection
establishment which is default; False for externally-managed
connection workflow via the `create_connection()` factory
"""
if pika.compat.ON_WINDOWS:
raise RuntimeError('GeventConnection is not supported on Windows.')
custom_ioloop = (custom_ioloop or
_GeventSelectorIOLoop(gevent.get_hub()))
if isinstance(custom_ioloop, AbstractIOServices):
nbio = custom_ioloop
else:
nbio = _GeventSelectorIOServicesAdapter(custom_ioloop)
super(GeventConnection, self).__init__(
parameters,
on_open_callback,
on_open_error_callback,
on_close_callback,
nbio,
internal_connection_workflow=internal_connection_workflow)
[docs]
@classmethod
def create_connection(cls,
connection_configs,
on_done,
custom_ioloop=None,
workflow=None):
"""Implement
:py:classmethod::`pika.adapters.BaseConnection.create_connection()`.
"""
custom_ioloop = (custom_ioloop or
_GeventSelectorIOLoop(gevent.get_hub()))
nbio = _GeventSelectorIOServicesAdapter(custom_ioloop)
def connection_factory(params):
"""Connection factory."""
if params is None:
raise ValueError('Expected pika.connection.Parameters '
'instance, but got None in params arg.')
return cls(parameters=params,
custom_ioloop=nbio,
internal_connection_workflow=False)
return cls._start_connection_workflow(
connection_configs=connection_configs,
connection_factory=connection_factory,
nbio=nbio,
workflow=workflow,
on_done=on_done)
class _TSafeCallbackQueue(object):
"""Dispatch callbacks from any thread to be executed in the main thread
efficiently with IO events.
"""
def __init__(self):
"""
:param _GeventSelectorIOLoop loop: IO loop to add callbacks to.
"""
# Thread-safe, blocking queue.
self._queue = queue.Queue()
# PIPE to trigger an event when the queue is ready.
self._read_fd, self._write_fd = os.pipe()
# Lock around writes to the PIPE in case some platform/implementation
# requires this.
self._write_lock = threading.RLock()
@property
def fd(self):
"""The file-descriptor to register for READ events in the IO loop."""
return self._read_fd
def add_callback_threadsafe(self, callback):
"""Add an item to the queue from any thread. The configured handler
will be invoked with the item in the main thread.
:param item: Object to add to the queue.
"""
self._queue.put(callback)
with self._write_lock:
# The value written is not important.
os.write(self._write_fd, b'\xFF')
def run_next_callback(self):
"""Invoke the next callback from the queue.
MUST run in the main thread. If no callback was added to the queue,
this will block the IO loop.
Performs a blocking READ on the pipe so must only be called when the
pipe is ready for reading.
"""
try:
callback = self._queue.get_nowait()
except queue.Empty:
# Should never happen.
LOGGER.warning("Callback queue was empty.")
else:
# Read the byte from the pipe so the event doesn't re-fire.
os.read(self._read_fd, 1)
callback()
class _GeventSelectorIOLoop(AbstractSelectorIOLoop):
"""Implementation of `AbstractSelectorIOLoop` using the Gevent event loop.
Required by implementations of `SelectorIOServicesAdapter`.
"""
# Gevent's READ and WRITE masks are defined as 1 and 2 respectively. No
# ERROR mask is defined.
# See http://www.gevent.org/api/gevent.hub.html#gevent._interfaces.ILoop.io
READ = 1
WRITE = 2
ERROR = 0
def __init__(self, gevent_hub=None):
"""
:param gevent._interfaces.ILoop gevent_loop:
"""
self._hub = gevent_hub or gevent.get_hub()
self._io_watchers_by_fd = {}
# Used to start/stop the loop.
self._waiter = gevent.hub.Waiter()
# For adding callbacks from other threads. See `add_callback(..)`.
self._callback_queue = _TSafeCallbackQueue()
def run_callback_in_main_thread(fd, events):
"""Swallow the fd and events arguments."""
del fd
del events
self._callback_queue.run_next_callback()
self.add_handler(self._callback_queue.fd, run_callback_in_main_thread,
self.READ)
def close(self):
"""Release the loop's resources."""
self._hub.loop.destroy()
self._hub = None
def start(self):
"""Run the I/O loop. It will loop until requested to exit. See `stop()`.
"""
LOGGER.debug("Passing control to Gevent's IOLoop")
self._waiter.get() # Block until 'stop()' is called.
LOGGER.debug("Control was passed back from Gevent's IOLoop")
self._waiter.clear()
def stop(self):
"""Request exit from the ioloop. The loop is NOT guaranteed to
stop before this method returns.
To invoke `stop()` safely from a thread other than this IOLoop's thread,
call it via `add_callback_threadsafe`; e.g.,
`ioloop.add_callback(ioloop.stop)`
"""
self._waiter.switch(None)
def add_callback(self, callback):
"""Requests a call to the given function as soon as possible in the
context of this IOLoop's thread.
NOTE: This is the only thread-safe method in IOLoop. All other
manipulations of IOLoop must be performed from the IOLoop's thread.
For example, a thread may request a call to the `stop` method of an
ioloop that is running in a different thread via
`ioloop.add_callback_threadsafe(ioloop.stop)`
:param callable callback: The callback method
"""
if gevent.get_hub() == self._hub:
# We're in the main thread; just add the callback.
LOGGER.debug("Adding callback from main thread")
self._hub.loop.run_callback(callback)
else:
# This isn't the main thread and Gevent's hub/loop don't provide
# any thread-safety so enqueue the callback for it to be registered
# in the main thread.
LOGGER.debug("Adding callback from another thread")
callback = functools.partial(self._hub.loop.run_callback, callback)
self._callback_queue.add_callback_threadsafe(callback)
def call_later(self, delay, callback):
"""Add the callback to the IOLoop timer to be called after delay seconds
from the time of call on best-effort basis. Returns a handle to the
timeout.
:param float delay: The number of seconds to wait to call callback
:param callable callback: The callback method
:returns: handle to the created timeout that may be passed to
`remove_timeout()`
:rtype: object
"""
timer = self._hub.loop.timer(delay)
timer.start(callback)
return timer
def remove_timeout(self, timeout_handle):
"""Remove a timeout
:param timeout_handle: Handle of timeout to remove
"""
timeout_handle.close()
def add_handler(self, fd, handler, events):
"""Start watching the given file descriptor for events
:param int fd: The file descriptor
:param callable handler: When requested event(s) occur,
`handler(fd, events)` will be called.
:param int events: The event mask (READ|WRITE)
"""
io_watcher = self._hub.loop.io(fd, events)
self._io_watchers_by_fd[fd] = io_watcher
io_watcher.start(handler, fd, events)
def update_handler(self, fd, events):
"""Change the events being watched for.
:param int fd: The file descriptor
:param int events: The new event mask (READ|WRITE)
"""
io_watcher = self._io_watchers_by_fd[fd]
# Save callback from the original watcher. The close the old watcher
# and create a new one using the saved callback and the new events.
callback = io_watcher.callback
io_watcher.close()
del self._io_watchers_by_fd[fd]
self.add_handler(fd, callback, events)
def remove_handler(self, fd):
"""Stop watching the given file descriptor for events
:param int fd: The file descriptor
"""
io_watcher = self._io_watchers_by_fd[fd]
io_watcher.close()
del self._io_watchers_by_fd[fd]
class _GeventSelectorIOServicesAdapter(SelectorIOServicesAdapter):
"""SelectorIOServicesAdapter implementation using Gevent's DNS resolver."""
def getaddrinfo(self,
host,
port,
on_done,
family=0,
socktype=0,
proto=0,
flags=0):
"""Implement :py:meth:`.nbio_interface.AbstractIOServices.getaddrinfo()`.
"""
resolver = _GeventAddressResolver(native_loop=self._loop,
host=host,
port=port,
family=family,
socktype=socktype,
proto=proto,
flags=flags,
on_done=on_done)
resolver.start()
# Return needs an implementation of `AbstractIOReference`.
return _GeventIOLoopIOHandle(resolver)
class _GeventIOLoopIOHandle(AbstractIOReference):
"""Implement `AbstractIOReference`.
Only used to wrap the _GeventAddressResolver.
"""
def __init__(self, subject):
"""
:param subject: subject of the reference containing a `cancel()` method
"""
self._cancel = subject.cancel
def cancel(self):
"""Cancel pending operation
:returns: False if was already done or cancelled; True otherwise
:rtype: bool
"""
return self._cancel()
class _GeventAddressResolver(object):
"""Performs getaddrinfo asynchronously Gevent's configured resolver in a
separate greenlet and invoking the provided callback with the result.
See: http://www.gevent.org/dns.html
"""
__slots__ = (
'_loop',
'_on_done',
'_greenlet',
# getaddrinfo(..) args:
'_ga_host',
'_ga_port',
'_ga_family',
'_ga_socktype',
'_ga_proto',
'_ga_flags')
def __init__(self, native_loop, host, port, family, socktype, proto, flags,
on_done):
"""Initialize the `_GeventAddressResolver`.
:param AbstractSelectorIOLoop native_loop:
:param host: `see socket.getaddrinfo()`
:param port: `see socket.getaddrinfo()`
:param family: `see socket.getaddrinfo()`
:param socktype: `see socket.getaddrinfo()`
:param proto: `see socket.getaddrinfo()`
:param flags: `see socket.getaddrinfo()`
:param on_done: on_done(records|BaseException) callback for reporting
result from the given I/O loop. The single arg will be either an
exception object (check for `BaseException`) in case of failure or
the result returned by `socket.getaddrinfo()`.
"""
check_callback_arg(on_done, 'on_done')
self._loop = native_loop
self._on_done = on_done
# Reference to the greenlet performing `getaddrinfo`.
self._greenlet = None
# getaddrinfo(..) args.
self._ga_host = host
self._ga_port = port
self._ga_family = family
self._ga_socktype = socktype
self._ga_proto = proto
self._ga_flags = flags
def start(self):
"""Start an asynchronous getaddrinfo invocation."""
if self._greenlet is None:
self._greenlet = gevent.spawn_raw(self._resolve)
else:
LOGGER.warning("_GeventAddressResolver already started")
def cancel(self):
"""Cancel the pending resolver."""
changed = False
if self._greenlet is not None:
changed = True
self._stop_greenlet()
self._cleanup()
return changed
def _cleanup(self):
"""Stop the resolver and release any resources."""
self._stop_greenlet()
self._loop = None
self._on_done = None
def _stop_greenlet(self):
"""Stop the greenlet performing getaddrinfo if running.
Otherwise, this is a no-op.
"""
if self._greenlet is not None:
gevent.kill(self._greenlet)
self._greenlet = None
def _resolve(self):
"""Call `getaddrinfo()` and return result via user's callback
function on the configured IO loop.
"""
try:
# NOTE(JG): Can't use kwargs with getaddrinfo on Python <= v2.7.
result = gevent.socket.getaddrinfo(self._ga_host, self._ga_port,
self._ga_family,
self._ga_socktype,
self._ga_proto, self._ga_flags)
except Exception as exc: # pylint: disable=broad-except
LOGGER.error('Address resolution failed: %r', exc)
result = exc
callback = functools.partial(self._dispatch_callback, result)
self._loop.add_callback(callback)
def _dispatch_callback(self, result):
"""Invoke the configured completion callback and any subsequent cleanup.
:param result: result from getaddrinfo, or the exception if raised.
"""
try:
LOGGER.debug(
'Invoking async getaddrinfo() completion callback; host=%r',
self._ga_host)
self._on_done(result)
finally:
self._cleanup()