This document describes the current stable version of Celery (5.2). For development docs, go here.
Source code for celery.utils.functional
"""Functional-style utilities."""
import inspect
import sys
from collections import UserList
from functools import partial
from itertools import islice, tee, zip_longest
from kombu.utils.functional import (LRUCache, dictfilter, is_list, lazy,
maybe_evaluate, maybe_list, memoize)
from vine import promise
__all__ = (
'LRUCache', 'is_list', 'maybe_list', 'memoize', 'mlazy', 'noop',
'first', 'firstmethod', 'chunks', 'padlist', 'mattrgetter', 'uniq',
'regen', 'dictfilter', 'lazy', 'maybe_evaluate', 'head_from_fun',
'maybe', 'fun_accepts_kwargs',
)
FUNHEAD_TEMPLATE = """
def {fun_name}({fun_args}):
return {fun_value}
"""
class DummyContext:
def __enter__(self):
return self
def __exit__(self, *exc_info):
pass
[docs]class mlazy(lazy):
"""Memoized lazy evaluation.
The function is only evaluated once, every subsequent access
will return the same value.
"""
#: Set to :const:`True` after the object has been evaluated.
evaluated = False
_value = None
[docs] def evaluate(self):
if not self.evaluated:
self._value = super().evaluate()
self.evaluated = True
return self._value
[docs]def noop(*args, **kwargs):
"""No operation.
Takes any arguments/keyword arguments and does nothing.
"""
def pass1(arg, *args, **kwargs):
"""Return the first positional argument."""
return arg
def evaluate_promises(it):
for value in it:
if isinstance(value, promise):
value = value()
yield value
[docs]def first(predicate, it):
"""Return the first element in ``it`` that ``predicate`` accepts.
If ``predicate`` is None it will return the first item that's not
:const:`None`.
"""
return next(
(v for v in evaluate_promises(it) if (
predicate(v) if predicate is not None else v is not None)),
None,
)
[docs]def firstmethod(method, on_call=None):
"""Multiple dispatch.
Return a function that with a list of instances,
finds the first instance that gives a value for the given method.
The list can also contain lazy instances
(:class:`~kombu.utils.functional.lazy`.)
"""
def _matcher(it, *args, **kwargs):
for obj in it:
try:
meth = getattr(maybe_evaluate(obj), method)
reply = (on_call(meth, *args, **kwargs) if on_call
else meth(*args, **kwargs))
except AttributeError:
pass
else:
if reply is not None:
return reply
return _matcher
[docs]def chunks(it, n):
"""Split an iterator into chunks with `n` elements each.
Warning:
``it`` must be an actual iterator, if you pass this a
concrete sequence will get you repeating elements.
So ``chunks(iter(range(1000)), 10)`` is fine, but
``chunks(range(1000), 10)`` is not.
Example:
# n == 2
>>> x = chunks(iter([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]), 2)
>>> list(x)
[[0, 1], [2, 3], [4, 5], [6, 7], [8, 9], [10]]
# n == 3
>>> x = chunks(iter([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]), 3)
>>> list(x)
[[0, 1, 2], [3, 4, 5], [6, 7, 8], [9, 10]]
"""
for item in it:
yield [item] + list(islice(it, n - 1))
[docs]def padlist(container, size, default=None):
"""Pad list with default elements.
Example:
>>> first, last, city = padlist(['George', 'Costanza', 'NYC'], 3)
('George', 'Costanza', 'NYC')
>>> first, last, city = padlist(['George', 'Costanza'], 3)
('George', 'Costanza', None)
>>> first, last, city, planet = padlist(
... ['George', 'Costanza', 'NYC'], 4, default='Earth',
... )
('George', 'Costanza', 'NYC', 'Earth')
"""
return list(container)[:size] + [default] * (size - len(container))
[docs]def mattrgetter(*attrs):
"""Get attributes, ignoring attribute errors.
Like :func:`operator.itemgetter` but return :const:`None` on missing
attributes instead of raising :exc:`AttributeError`.
"""
return lambda obj: {attr: getattr(obj, attr, None) for attr in attrs}
[docs]def uniq(it):
"""Return all unique elements in ``it``, preserving order."""
seen = set()
return (seen.add(obj) or obj for obj in it if obj not in seen)
def lookahead(it):
"""Yield pairs of (current, next) items in `it`.
`next` is None if `current` is the last item.
Example:
>>> list(lookahead(x for x in range(6)))
[(0, 1), (1, 2), (2, 3), (3, 4), (4, 5), (5, None)]
"""
a, b = tee(it)
next(b, None)
return zip_longest(a, b)
[docs]def regen(it):
"""Convert iterator to an object that can be consumed multiple times.
``Regen`` takes any iterable, and if the object is an
generator it will cache the evaluated list on first access,
so that the generator can be "consumed" multiple times.
"""
if isinstance(it, (list, tuple)):
return it
return _regen(it)
class _regen(UserList, list):
# must be subclass of list so that json can encode.
def __init__(self, it):
# pylint: disable=super-init-not-called
# UserList creates a new list and sets .data, so we don't
# want to call init here.
self.__it = it
self.__consumed = []
self.__done = False
def __reduce__(self):
return list, (self.data,)
def __length_hint__(self):
return self.__it.__length_hint__()
def __lookahead_consume(self, limit=None):
if not self.__done and (limit is None or limit > 0):
it = iter(self.__it)
try:
now = next(it)
except StopIteration:
return
self.__consumed.append(now)
# Maintain a single look-ahead to ensure we set `__done` when the
# underlying iterator gets exhausted
while not self.__done:
try:
next_ = next(it)
self.__consumed.append(next_)
except StopIteration:
self.__done = True
break
finally:
yield now
now = next_
# We can break out when `limit` is exhausted
if limit is not None:
limit -= 1
if limit <= 0:
break
def __iter__(self):
yield from self.__consumed
yield from self.__lookahead_consume()
def __getitem__(self, index):
if index < 0:
return self.data[index]
# Consume elements up to the desired index prior to attempting to
# access it from within `__consumed`
consume_count = index - len(self.__consumed) + 1
for _ in self.__lookahead_consume(limit=consume_count):
pass
return self.__consumed[index]
def __bool__(self):
if len(self.__consumed):
return True
try:
next(iter(self))
except StopIteration:
return False
else:
return True
@property
def data(self):
if not self.__done:
self.__consumed.extend(self.__it)
self.__done = True
return self.__consumed
def __repr__(self):
return "<{}: [{}{}]>".format(
self.__class__.__name__,
", ".join(repr(e) for e in self.__consumed),
"..." if not self.__done else "",
)
def _argsfromspec(spec, replace_defaults=True):
if spec.defaults:
split = len(spec.defaults)
defaults = (list(range(len(spec.defaults))) if replace_defaults
else spec.defaults)
positional = spec.args[:-split]
optional = list(zip(spec.args[-split:], defaults))
else:
positional, optional = spec.args, []
varargs = spec.varargs
varkw = spec.varkw
if spec.kwonlydefaults:
kwonlyargs = set(spec.kwonlyargs) - set(spec.kwonlydefaults.keys())
if replace_defaults:
kwonlyargs_optional = [
(kw, i) for i, kw in enumerate(spec.kwonlydefaults.keys())
]
else:
kwonlyargs_optional = list(spec.kwonlydefaults.items())
else:
kwonlyargs, kwonlyargs_optional = spec.kwonlyargs, []
return ', '.join(filter(None, [
', '.join(positional),
', '.join(f'{k}={v}' for k, v in optional),
f'*{varargs}' if varargs else None,
'*' if (kwonlyargs or kwonlyargs_optional) and not varargs else None,
', '.join(kwonlyargs) if kwonlyargs else None,
', '.join(f'{k}="{v}"' for k, v in kwonlyargs_optional),
f'**{varkw}' if varkw else None,
]))
[docs]def head_from_fun(fun, bound=False, debug=False):
"""Generate signature function from actual function."""
# we could use inspect.Signature here, but that implementation
# is very slow since it implements the argument checking
# in pure-Python. Instead we use exec to create a new function
# with an empty body, meaning it has the same performance as
# as just calling a function.
is_function = inspect.isfunction(fun)
is_callable = hasattr(fun, '__call__')
is_cython = fun.__class__.__name__ == 'cython_function_or_method'
is_method = inspect.ismethod(fun)
if not is_function and is_callable and not is_method and not is_cython:
name, fun = fun.__class__.__name__, fun.__call__
else:
name = fun.__name__
definition = FUNHEAD_TEMPLATE.format(
fun_name=name,
fun_args=_argsfromspec(inspect.getfullargspec(fun)),
fun_value=1,
)
if debug: # pragma: no cover
print(definition, file=sys.stderr)
namespace = {'__name__': fun.__module__}
# pylint: disable=exec-used
# Tasks are rarely, if ever, created at runtime - exec here is fine.
exec(definition, namespace)
result = namespace[name]
result._source = definition
if bound:
return partial(result, object())
return result
def arity_greater(fun, n):
argspec = inspect.getfullargspec(fun)
return argspec.varargs or len(argspec.args) > n
def fun_takes_argument(name, fun, position=None):
spec = inspect.getfullargspec(fun)
return (
spec.varkw or spec.varargs or
(len(spec.args) >= position if position else name in spec.args)
)
[docs]def fun_accepts_kwargs(fun):
"""Return true if function accepts arbitrary keyword arguments."""
return any(
p for p in inspect.signature(fun).parameters.values()
if p.kind == p.VAR_KEYWORD
)
[docs]def maybe(typ, val):
"""Call typ on value if val is defined."""
return typ(val) if val is not None else val
def seq_concat_item(seq, item):
"""Return copy of sequence seq with item added.
Returns:
Sequence: if seq is a tuple, the result will be a tuple,
otherwise it depends on the implementation of ``__add__``.
"""
return seq + (item,) if isinstance(seq, tuple) else seq + [item]
def seq_concat_seq(a, b):
"""Concatenate two sequences: ``a + b``.
Returns:
Sequence: The return value will depend on the largest sequence
- if b is larger and is a tuple, the return value will be a tuple.
- if a is larger and is a list, the return value will be a list,
"""
# find the type of the largest sequence
prefer = type(max([a, b], key=len))
# convert the smallest list to the type of the largest sequence.
if not isinstance(a, prefer):
a = prefer(a)
if not isinstance(b, prefer):
b = prefer(b)
return a + b