This document describes the current stable version of Celery (5.2). For development docs, go here.
Source code for celery.utils.log
"""Logging utilities."""
import logging
import numbers
import os
import sys
import threading
import traceback
from contextlib import contextmanager
from typing import AnyStr, Sequence
from kombu.log import LOG_LEVELS
from kombu.log import get_logger as _get_logger
from kombu.utils.encoding import safe_str
from .term import colored
__all__ = (
'ColorFormatter', 'LoggingProxy', 'base_logger',
'set_in_sighandler', 'in_sighandler', 'get_logger',
'get_task_logger', 'mlevel',
'get_multiprocessing_logger', 'reset_multiprocessing_logger', 'LOG_LEVELS'
)
_process_aware = False
_in_sighandler = False
MP_LOG = os.environ.get('MP_LOG', False)
RESERVED_LOGGER_NAMES = {'celery', 'celery.task'}
# Sets up our logging hierarchy.
#
# Every logger in the celery package inherits from the "celery"
# logger, and every task logger inherits from the "celery.task"
# logger.
base_logger = logger = _get_logger('celery')
[docs]def set_in_sighandler(value):
"""Set flag signifiying that we're inside a signal handler."""
global _in_sighandler
_in_sighandler = value
def iter_open_logger_fds():
seen = set()
loggers = (list(logging.Logger.manager.loggerDict.values()) +
[logging.getLogger(None)])
for l in loggers:
try:
for handler in l.handlers:
try:
if handler not in seen: # pragma: no cover
yield handler.stream
seen.add(handler)
except AttributeError:
pass
except AttributeError: # PlaceHolder does not have handlers
pass
[docs]@contextmanager
def in_sighandler():
"""Context that records that we are in a signal handler."""
set_in_sighandler(True)
try:
yield
finally:
set_in_sighandler(False)
def logger_isa(l, p, max=1000):
this, seen = l, set()
for _ in range(max):
if this == p:
return True
else:
if this in seen:
raise RuntimeError(
f'Logger {l.name!r} parents recursive',
)
seen.add(this)
this = this.parent
if not this:
break
else: # pragma: no cover
raise RuntimeError(f'Logger hierarchy exceeds {max}')
return False
def _using_logger_parent(parent_logger, logger_):
if not logger_isa(logger_, parent_logger):
logger_.parent = parent_logger
return logger_
[docs]def get_logger(name):
"""Get logger by name."""
l = _get_logger(name)
if logging.root not in (l, l.parent) and l is not base_logger:
l = _using_logger_parent(base_logger, l)
return l
task_logger = get_logger('celery.task')
worker_logger = get_logger('celery.worker')
[docs]def get_task_logger(name):
"""Get logger for task module by name."""
if name in RESERVED_LOGGER_NAMES:
raise RuntimeError(f'Logger name {name!r} is reserved!')
return _using_logger_parent(task_logger, get_logger(name))
[docs]def mlevel(level):
"""Convert level name/int to log level."""
if level and not isinstance(level, numbers.Integral):
return LOG_LEVELS[level.upper()]
return level
[docs]class ColorFormatter(logging.Formatter):
"""Logging formatter that adds colors based on severity."""
#: Loglevel -> Color mapping.
COLORS = colored().names
colors = {
'DEBUG': COLORS['blue'],
'WARNING': COLORS['yellow'],
'ERROR': COLORS['red'],
'CRITICAL': COLORS['magenta'],
}
def __init__(self, fmt=None, use_color=True):
super().__init__(fmt)
self.use_color = use_color
[docs] def formatException(self, ei):
if ei and not isinstance(ei, tuple):
ei = sys.exc_info()
r = super().formatException(ei)
return r
[docs] def format(self, record):
msg = super().format(record)
color = self.colors.get(record.levelname)
# reset exception info later for other handlers...
einfo = sys.exc_info() if record.exc_info == 1 else record.exc_info
if color and self.use_color:
try:
# safe_str will repr the color object
# and color will break on non-string objects
# so need to reorder calls based on type.
# Issue #427
try:
if isinstance(msg, str):
return str(color(safe_str(msg)))
return safe_str(color(msg))
except UnicodeDecodeError: # pragma: no cover
return safe_str(msg) # skip colors
except Exception as exc: # pylint: disable=broad-except
prev_msg, record.exc_info, record.msg = (
record.msg, 1, '<Unrepresentable {!r}: {!r}>'.format(
type(msg), exc
),
)
try:
return super().format(record)
finally:
record.msg, record.exc_info = prev_msg, einfo
else:
return safe_str(msg)
[docs]class LoggingProxy:
"""Forward file object to :class:`logging.Logger` instance.
Arguments:
logger (~logging.Logger): Logger instance to forward to.
loglevel (int, str): Log level to use when logging messages.
"""
mode = 'w'
name = None
closed = False
loglevel = logging.ERROR
_thread = threading.local()
def __init__(self, logger, loglevel=None):
# pylint: disable=redefined-outer-name
# Note that the logger global is redefined here, be careful changing.
self.logger = logger
self.loglevel = mlevel(loglevel or self.logger.level or self.loglevel)
self._safewrap_handlers()
def _safewrap_handlers(self):
# Make the logger handlers dump internal errors to
# :data:`sys.__stderr__` instead of :data:`sys.stderr` to circumvent
# infinite loops.
def wrap_handler(handler): # pragma: no cover
class WithSafeHandleError(logging.Handler):
def handleError(self, record):
try:
traceback.print_exc(None, sys.__stderr__)
except OSError:
pass # see python issue 5971
handler.handleError = WithSafeHandleError().handleError
return [wrap_handler(h) for h in self.logger.handlers]
[docs] def write(self, data):
# type: (AnyStr) -> int
"""Write message to logging object."""
if _in_sighandler:
safe_data = safe_str(data)
print(safe_data, file=sys.__stderr__)
return len(safe_data)
if getattr(self._thread, 'recurse_protection', False):
# Logger is logging back to this file, so stop recursing.
return 0
if data and not self.closed:
self._thread.recurse_protection = True
try:
safe_data = safe_str(data).rstrip('\n')
if safe_data:
self.logger.log(self.loglevel, safe_data)
return len(safe_data)
finally:
self._thread.recurse_protection = False
return 0
[docs] def writelines(self, sequence):
# type: (Sequence[str]) -> None
"""Write list of strings to file.
The sequence can be any iterable object producing strings.
This is equivalent to calling :meth:`write` for each string.
"""
for part in sequence:
self.write(part)
[docs] def flush(self):
# This object is not buffered so any :meth:`flush`
# requests are ignored.
pass
[docs] def close(self):
# when the object is closed, no write requests are
# forwarded to the logging object anymore.
self.closed = True
[docs]def get_multiprocessing_logger():
"""Return the multiprocessing logger."""
try:
from billiard import util
except ImportError: # pragma: no cover
pass
else:
return util.get_logger()
[docs]def reset_multiprocessing_logger():
"""Reset multiprocessing logging setup."""
try:
from billiard import util
except ImportError: # pragma: no cover
pass
else:
if hasattr(util, '_logger'): # pragma: no cover
util._logger = None
def current_process():
try:
from billiard import process
except ImportError: # pragma: no cover
pass
else:
return process.current_process()
def current_process_index(base=1):
index = getattr(current_process(), 'index', None)
return index + base if index is not None else index