This document describes the current stable version of Celery (5.2). For development docs, go here.
Source code for celery.apps.beat
"""Beat command-line program.
This module is the 'program-version' of :mod:`celery.beat`.
It does everything necessary to run that module
as an actual application, like installing signal handlers
and so on.
"""
import numbers
import socket
import sys
from datetime import datetime
from celery import VERSION_BANNER, beat, platforms
from celery.utils.imports import qualname
from celery.utils.log import LOG_LEVELS, get_logger
from celery.utils.time import humanize_seconds
__all__ = ('Beat',)
STARTUP_INFO_FMT = """
LocalTime -> {timestamp}
Configuration ->
. broker -> {conninfo}
. loader -> {loader}
. scheduler -> {scheduler}
{scheduler_info}
. logfile -> {logfile}@%{loglevel}
. maxinterval -> {hmax_interval} ({max_interval}s)
""".strip()
logger = get_logger('celery.beat')
[docs]class Beat:
"""Beat as a service."""
Service = beat.Service
app = None
def __init__(self, max_interval=None, app=None,
socket_timeout=30, pidfile=None, no_color=None,
loglevel='WARN', logfile=None, schedule=None,
scheduler=None,
scheduler_cls=None, # XXX use scheduler
redirect_stdouts=None,
redirect_stdouts_level=None, **kwargs):
self.app = app = app or self.app
either = self.app.either
self.loglevel = loglevel
self.logfile = logfile
self.schedule = either('beat_schedule_filename', schedule)
self.scheduler_cls = either(
'beat_scheduler', scheduler, scheduler_cls)
self.redirect_stdouts = either(
'worker_redirect_stdouts', redirect_stdouts)
self.redirect_stdouts_level = either(
'worker_redirect_stdouts_level', redirect_stdouts_level)
self.max_interval = max_interval
self.socket_timeout = socket_timeout
self.no_color = no_color
self.colored = app.log.colored(
self.logfile,
enabled=not no_color if no_color is not None else no_color,
)
self.pidfile = pidfile
if not isinstance(self.loglevel, numbers.Integral):
self.loglevel = LOG_LEVELS[self.loglevel.upper()]
[docs] def run(self):
print(str(self.colored.cyan(
f'celery beat v{VERSION_BANNER} is starting.')))
self.init_loader()
self.set_process_title()
self.start_scheduler()
[docs] def setup_logging(self, colorize=None):
if colorize is None and self.no_color is not None:
colorize = not self.no_color
self.app.log.setup(self.loglevel, self.logfile,
self.redirect_stdouts, self.redirect_stdouts_level,
colorize=colorize)
[docs] def start_scheduler(self):
if self.pidfile:
platforms.create_pidlock(self.pidfile)
service = self.Service(
app=self.app,
max_interval=self.max_interval,
scheduler_cls=self.scheduler_cls,
schedule_filename=self.schedule,
)
print(self.banner(service))
self.setup_logging()
if self.socket_timeout:
logger.debug('Setting default socket timeout to %r',
self.socket_timeout)
socket.setdefaulttimeout(self.socket_timeout)
try:
self.install_sync_handler(service)
service.start()
except Exception as exc:
logger.critical('beat raised exception %s: %r',
exc.__class__, exc,
exc_info=True)
raise
[docs] def banner(self, service):
c = self.colored
return str(
c.blue('__ ', c.magenta('-'),
c.blue(' ... __ '), c.magenta('-'),
c.blue(' _\n'),
c.reset(self.startup_info(service))),
)
[docs] def init_loader(self):
# Run the worker init handler.
# (Usually imports task modules and such.)
self.app.loader.init_worker()
self.app.finalize()
[docs] def startup_info(self, service):
scheduler = service.get_scheduler(lazy=True)
return STARTUP_INFO_FMT.format(
conninfo=self.app.connection().as_uri(),
timestamp=datetime.now().replace(microsecond=0),
logfile=self.logfile or '[stderr]',
loglevel=LOG_LEVELS[self.loglevel],
loader=qualname(self.app.loader),
scheduler=qualname(scheduler),
scheduler_info=scheduler.info,
hmax_interval=humanize_seconds(scheduler.max_interval),
max_interval=scheduler.max_interval,
)
[docs] def set_process_title(self):
arg_start = 'manage' in sys.argv[0] and 2 or 1
platforms.set_process_title(
'celery beat', info=' '.join(sys.argv[arg_start:]),
)
[docs] def install_sync_handler(self, service):
"""Install a `SIGTERM` + `SIGINT` handler saving the schedule."""
def _sync(signum, frame):
service.sync()
raise SystemExit()
platforms.signals.update(SIGTERM=_sync, SIGINT=_sync)