This document describes the current stable version of Celery (5.2). For development docs, go here.
Source code for celery.events.dispatcher
"""Event dispatcher sends events."""
import os
import threading
import time
from collections import defaultdict, deque
from kombu import Producer
from celery.app import app_or_default
from celery.utils.nodenames import anon_nodename
from celery.utils.time import utcoffset
from .event import Event, get_exchange, group_from
__all__ = ('EventDispatcher',)
[docs]class EventDispatcher:
"""Dispatches event messages.
Arguments:
connection (kombu.Connection): Connection to the broker.
hostname (str): Hostname to identify ourselves as,
by default uses the hostname returned by
:func:`~celery.utils.anon_nodename`.
groups (Sequence[str]): List of groups to send events for.
:meth:`send` will ignore send requests to groups not in this list.
If this is :const:`None`, all events will be sent.
Example groups include ``"task"`` and ``"worker"``.
enabled (bool): Set to :const:`False` to not actually publish any
events, making :meth:`send` a no-op.
channel (kombu.Channel): Can be used instead of `connection` to specify
an exact channel to use when sending events.
buffer_while_offline (bool): If enabled events will be buffered
while the connection is down. :meth:`flush` must be called
as soon as the connection is re-established.
Note:
You need to :meth:`close` this after use.
"""
DISABLED_TRANSPORTS = {'sql'}
app = None
# set of callbacks to be called when :meth:`enabled`.
on_enabled = None
# set of callbacks to be called when :meth:`disabled`.
on_disabled = None
def __init__(self, connection=None, hostname=None, enabled=True,
channel=None, buffer_while_offline=True, app=None,
serializer=None, groups=None, delivery_mode=1,
buffer_group=None, buffer_limit=24, on_send_buffered=None):
self.app = app_or_default(app or self.app)
self.connection = connection
self.channel = channel
self.hostname = hostname or anon_nodename()
self.buffer_while_offline = buffer_while_offline
self.buffer_group = buffer_group or frozenset()
self.buffer_limit = buffer_limit
self.on_send_buffered = on_send_buffered
self._group_buffer = defaultdict(list)
self.mutex = threading.Lock()
self.producer = None
self._outbound_buffer = deque()
self.serializer = serializer or self.app.conf.event_serializer
self.on_enabled = set()
self.on_disabled = set()
self.groups = set(groups or [])
self.tzoffset = [-time.timezone, -time.altzone]
self.clock = self.app.clock
self.delivery_mode = delivery_mode
if not connection and channel:
self.connection = channel.connection.client
self.enabled = enabled
conninfo = self.connection or self.app.connection_for_write()
self.exchange = get_exchange(conninfo,
name=self.app.conf.event_exchange)
if conninfo.transport.driver_type in self.DISABLED_TRANSPORTS:
self.enabled = False
if self.enabled:
self.enable()
self.headers = {'hostname': self.hostname}
self.pid = os.getpid()
def __enter__(self):
return self
def __exit__(self, *exc_info):
self.close()
[docs] def enable(self):
self.producer = Producer(self.channel or self.connection,
exchange=self.exchange,
serializer=self.serializer,
auto_declare=False)
self.enabled = True
for callback in self.on_enabled:
callback()
[docs] def disable(self):
if self.enabled:
self.enabled = False
self.close()
for callback in self.on_disabled:
callback()
[docs] def publish(self, type, fields, producer,
blind=False, Event=Event, **kwargs):
"""Publish event using custom :class:`~kombu.Producer`.
Arguments:
type (str): Event type name, with group separated by dash (`-`).
fields: Dictionary of event fields, must be json serializable.
producer (kombu.Producer): Producer instance to use:
only the ``publish`` method will be called.
retry (bool): Retry in the event of connection failure.
retry_policy (Mapping): Map of custom retry policy options.
See :meth:`~kombu.Connection.ensure`.
blind (bool): Don't set logical clock value (also don't forward
the internal logical clock).
Event (Callable): Event type used to create event.
Defaults to :func:`Event`.
utcoffset (Callable): Function returning the current
utc offset in hours.
"""
clock = None if blind else self.clock.forward()
event = Event(type, hostname=self.hostname, utcoffset=utcoffset(),
pid=self.pid, clock=clock, **fields)
with self.mutex:
return self._publish(event, producer,
routing_key=type.replace('-', '.'), **kwargs)
def _publish(self, event, producer, routing_key, retry=False,
retry_policy=None, utcoffset=utcoffset):
exchange = self.exchange
try:
producer.publish(
event,
routing_key=routing_key,
exchange=exchange.name,
retry=retry,
retry_policy=retry_policy,
declare=[exchange],
serializer=self.serializer,
headers=self.headers,
delivery_mode=self.delivery_mode,
)
except Exception as exc: # pylint: disable=broad-except
if not self.buffer_while_offline:
raise
self._outbound_buffer.append((event, routing_key, exc))
[docs] def send(self, type, blind=False, utcoffset=utcoffset, retry=False,
retry_policy=None, Event=Event, **fields):
"""Send event.
Arguments:
type (str): Event type name, with group separated by dash (`-`).
retry (bool): Retry in the event of connection failure.
retry_policy (Mapping): Map of custom retry policy options.
See :meth:`~kombu.Connection.ensure`.
blind (bool): Don't set logical clock value (also don't forward
the internal logical clock).
Event (Callable): Event type used to create event,
defaults to :func:`Event`.
utcoffset (Callable): unction returning the current utc offset
in hours.
**fields (Any): Event fields -- must be json serializable.
"""
if self.enabled:
groups, group = self.groups, group_from(type)
if groups and group not in groups:
return
if group in self.buffer_group:
clock = self.clock.forward()
event = Event(type, hostname=self.hostname,
utcoffset=utcoffset(),
pid=self.pid, clock=clock, **fields)
buf = self._group_buffer[group]
buf.append(event)
if len(buf) >= self.buffer_limit:
self.flush()
elif self.on_send_buffered:
self.on_send_buffered()
else:
return self.publish(type, fields, self.producer, blind=blind,
Event=Event, retry=retry,
retry_policy=retry_policy)
[docs] def flush(self, errors=True, groups=True):
"""Flush the outbound buffer."""
if errors:
buf = list(self._outbound_buffer)
try:
with self.mutex:
for event, routing_key, _ in buf:
self._publish(event, self.producer, routing_key)
finally:
self._outbound_buffer.clear()
if groups:
with self.mutex:
for group, events in self._group_buffer.items():
self._publish(events, self.producer, '%s.multi' % group)
events[:] = [] # list.clear
[docs] def extend_buffer(self, other):
"""Copy the outbound buffer of another instance."""
self._outbound_buffer.extend(other._outbound_buffer)
[docs] def close(self):
"""Close the event dispatcher."""
self.mutex.locked() and self.mutex.release()
self.producer = None
def _get_publisher(self):
return self.producer
def _set_publisher(self, producer):
self.producer = producer
publisher = property(_get_publisher, _set_publisher) # XXX compat