This document describes the current stable version of Celery (5.2). For development docs, go here.
Source code for celery.events.event
"""Creating events, and event exchange definition."""
import time
from copy import copy
from kombu import Exchange
__all__ = (
'Event', 'event_exchange', 'get_exchange', 'group_from',
)
EVENT_EXCHANGE_NAME = 'celeryev'
#: Exchange used to send events on.
#: Note: Use :func:`get_exchange` instead, as the type of
#: exchange will vary depending on the broker connection.
event_exchange = Exchange(EVENT_EXCHANGE_NAME, type='topic')
[docs]def Event(type, _fields=None, __dict__=dict, __now__=time.time, **fields):
"""Create an event.
Notes:
An event is simply a dictionary: the only required field is ``type``.
A ``timestamp`` field will be set to the current time if not provided.
"""
event = __dict__(_fields, **fields) if _fields else fields
if 'timestamp' not in event:
event.update(timestamp=__now__(), type=type)
else:
event['type'] = type
return event
[docs]def group_from(type):
"""Get the group part of an event type name.
Example:
>>> group_from('task-sent')
'task'
>>> group_from('custom-my-event')
'custom'
"""
return type.split('-', 1)[0]
[docs]def get_exchange(conn, name=EVENT_EXCHANGE_NAME):
"""Get exchange used for sending events.
Arguments:
conn (kombu.Connection): Connection used for sending/receiving events.
name (str): Name of the exchange. Default is ``celeryev``.
Note:
The event type changes if Redis is used as the transport
(from topic -> fanout).
"""
ex = copy(event_exchange)
if conn.transport.driver_type == 'redis':
# quick hack for Issue #436
ex.type = 'fanout'
if name != ex.name:
ex.name = name
return ex