This document describes the current stable version of Celery (5.2). For development docs, go here.
Source code for celery.utils.collections
"""Custom maps, sets, sequences, and other data structures."""
import time
from collections import OrderedDict as _OrderedDict
from collections import deque
from collections.abc import (Callable, Mapping, MutableMapping, MutableSet,
Sequence)
from heapq import heapify, heappop, heappush
from itertools import chain, count
from queue import Empty
from typing import Any, Dict, Iterable, List
from .functional import first, uniq
from .text import match_case
try:
# pypy: dicts are ordered in recent versions
from __pypy__ import reversed_dict as _dict_is_ordered
except ImportError:
_dict_is_ordered = None
try:
from django.utils.functional import LazyObject, LazySettings
except ImportError:
class LazyObject:
pass
LazySettings = LazyObject
__all__ = (
'AttributeDictMixin', 'AttributeDict', 'BufferMap', 'ChainMap',
'ConfigurationView', 'DictAttribute', 'Evictable',
'LimitedSet', 'Messagebuffer', 'OrderedDict',
'force_mapping', 'lpmerge',
)
REPR_LIMITED_SET = """\
<{name}({size}): maxlen={0.maxlen}, expires={0.expires}, minlen={0.minlen}>\
"""
[docs]def force_mapping(m):
# type: (Any) -> Mapping
"""Wrap object into supporting the mapping interface if necessary."""
if isinstance(m, (LazyObject, LazySettings)):
m = m._wrapped
return DictAttribute(m) if not isinstance(m, Mapping) else m
[docs]def lpmerge(L, R):
# type: (Mapping, Mapping) -> Mapping
"""In place left precedent dictionary merge.
Keeps values from `L`, if the value in `R` is :const:`None`.
"""
setitem = L.__setitem__
[setitem(k, v) for k, v in R.items() if v is not None]
return L
[docs]class OrderedDict(_OrderedDict):
"""Dict where insertion order matters."""
def _LRUkey(self):
# type: () -> Any
# return value of od.keys does not support __next__,
# but this version will also not create a copy of the list.
return next(iter(self.keys()))
if not hasattr(_OrderedDict, 'move_to_end'):
if _dict_is_ordered: # pragma: no cover
def move_to_end(self, key, last=True):
# type: (Any, bool) -> None
if not last:
# we don't use this argument, and the only way to
# implement this on PyPy seems to be O(n): creating a
# copy with the order changed, so we just raise.
raise NotImplementedError('no last=True on PyPy')
self[key] = self.pop(key)
else:
def move_to_end(self, key, last=True):
# type: (Any, bool) -> None
link = self._OrderedDict__map[key]
link_prev = link[0]
link_next = link[1]
link_prev[1] = link_next
link_next[0] = link_prev
root = self._OrderedDict__root
if last:
last = root[0]
link[0] = last
link[1] = root
last[1] = root[0] = link
else:
first_node = root[1]
link[0] = root
link[1] = first_node
root[1] = first_node[0] = link
[docs]class AttributeDictMixin:
"""Mixin for Mapping interface that adds attribute access.
I.e., `d.key -> d[key]`).
"""
def __getattr__(self, k):
# type: (str) -> Any
"""`d.key -> d[key]`."""
try:
return self[k]
except KeyError:
raise AttributeError(
f'{type(self).__name__!r} object has no attribute {k!r}')
def __setattr__(self, key, value):
# type: (str, Any) -> None
"""`d[key] = value -> d.key = value`."""
self[key] = value
[docs]class DictAttribute:
"""Dict interface to attributes.
`obj[k] -> obj.k`
`obj[k] = val -> obj.k = val`
"""
obj = None
def __init__(self, obj):
# type: (Any) -> None
object.__setattr__(self, 'obj', obj)
def __getattr__(self, key):
# type: (Any) -> Any
return getattr(self.obj, key)
def __setattr__(self, key, value):
# type: (Any, Any) -> None
return setattr(self.obj, key, value)
[docs] def get(self, key, default=None):
# type: (Any, Any) -> Any
try:
return self[key]
except KeyError:
return default
[docs] def setdefault(self, key, default=None):
# type: (Any, Any) -> None
if key not in self:
self[key] = default
def __getitem__(self, key):
# type: (Any) -> Any
try:
return getattr(self.obj, key)
except AttributeError:
raise KeyError(key)
def __setitem__(self, key, value):
# type: (Any, Any) -> Any
setattr(self.obj, key, value)
def __contains__(self, key):
# type: (Any) -> bool
return hasattr(self.obj, key)
def _iterate_keys(self):
# type: () -> Iterable
return iter(dir(self.obj))
iterkeys = _iterate_keys
def __iter__(self):
# type: () -> Iterable
return self._iterate_keys()
def _iterate_items(self):
# type: () -> Iterable
for key in self._iterate_keys():
yield key, getattr(self.obj, key)
iteritems = _iterate_items
def _iterate_values(self):
# type: () -> Iterable
for key in self._iterate_keys():
yield getattr(self.obj, key)
itervalues = _iterate_values
items = _iterate_items
keys = _iterate_keys
values = _iterate_values
MutableMapping.register(DictAttribute)
[docs]class ChainMap(MutableMapping):
"""Key lookup on a sequence of maps."""
key_t = None
changes = None
defaults = None
maps = None
_observers = []
def __init__(self, *maps, **kwargs):
# type: (*Mapping, **Any) -> None
maps = list(maps or [{}])
self.__dict__.update(
key_t=kwargs.get('key_t'),
maps=maps,
changes=maps[0],
defaults=maps[1:],
)
[docs] def add_defaults(self, d):
# type: (Mapping) -> None
d = force_mapping(d)
self.defaults.insert(0, d)
self.maps.insert(1, d)
[docs] def pop(self, key, *default):
# type: (Any, *Any) -> Any
try:
return self.maps[0].pop(key, *default)
except KeyError:
raise KeyError(
f'Key not found in the first mapping: {key!r}')
def __missing__(self, key):
# type: (Any) -> Any
raise KeyError(key)
def _key(self, key):
# type: (Any) -> Any
return self.key_t(key) if self.key_t is not None else key
def __getitem__(self, key):
# type: (Any) -> Any
_key = self._key(key)
for mapping in self.maps:
try:
return mapping[_key]
except KeyError:
pass
return self.__missing__(key)
def __setitem__(self, key, value):
# type: (Any, Any) -> None
self.changes[self._key(key)] = value
def __delitem__(self, key):
# type: (Any) -> None
try:
del self.changes[self._key(key)]
except KeyError:
raise KeyError(f'Key not found in first mapping: {key!r}')
[docs] def get(self, key, default=None):
# type: (Any, Any) -> Any
try:
return self[self._key(key)]
except KeyError:
return default
def __len__(self):
# type: () -> int
return len(set().union(*self.maps))
def __iter__(self):
return self._iterate_keys()
def __contains__(self, key):
# type: (Any) -> bool
key = self._key(key)
return any(key in m for m in self.maps)
def __bool__(self):
# type: () -> bool
return any(self.maps)
__nonzero__ = __bool__ # Py2
[docs] def setdefault(self, key, default=None):
# type: (Any, Any) -> None
key = self._key(key)
if key not in self:
self[key] = default
[docs] def update(self, *args, **kwargs):
# type: (*Any, **Any) -> Any
result = self.changes.update(*args, **kwargs)
for callback in self._observers:
callback(*args, **kwargs)
return result
def __repr__(self):
# type: () -> str
return '{0.__class__.__name__}({1})'.format(
self, ', '.join(map(repr, self.maps)))
[docs] @classmethod
def fromkeys(cls, iterable, *args):
# type: (type, Iterable, *Any) -> 'ChainMap'
"""Create a ChainMap with a single dict created from the iterable."""
return cls(dict.fromkeys(iterable, *args))
[docs] def copy(self):
# type: () -> 'ChainMap'
return self.__class__(self.maps[0].copy(), *self.maps[1:])
__copy__ = copy # Py2
def _iter(self, op):
# type: (Callable) -> Iterable
# defaults must be first in the stream, so values in
# changes take precedence.
# pylint: disable=bad-reversed-sequence
# Someone should teach pylint about properties.
return chain(*(op(d) for d in reversed(self.maps)))
def _iterate_keys(self):
# type: () -> Iterable
return uniq(self._iter(lambda d: d.keys()))
iterkeys = _iterate_keys
def _iterate_items(self):
# type: () -> Iterable
return ((key, self[key]) for key in self)
iteritems = _iterate_items
def _iterate_values(self):
# type: () -> Iterable
return (self[key] for key in self)
itervalues = _iterate_values
keys = _iterate_keys
items = _iterate_items
values = _iterate_values
[docs]class ConfigurationView(ChainMap, AttributeDictMixin):
"""A view over an applications configuration dictionaries.
Custom (but older) version of :class:`collections.ChainMap`.
If the key does not exist in ``changes``, the ``defaults``
dictionaries are consulted.
Arguments:
changes (Mapping): Map of configuration changes.
defaults (List[Mapping]): List of dictionaries containing
the default configuration.
"""
def __init__(self, changes, defaults=None, keys=None, prefix=None):
# type: (Mapping, Mapping, List[str], str) -> None
defaults = [] if defaults is None else defaults
super().__init__(changes, *defaults)
self.__dict__.update(
prefix=prefix.rstrip('_') + '_' if prefix else prefix,
_keys=keys,
)
def _to_keys(self, key):
# type: (str) -> Sequence[str]
prefix = self.prefix
if prefix:
pkey = prefix + key if not key.startswith(prefix) else key
return match_case(pkey, prefix), key
return key,
def __getitem__(self, key):
# type: (str) -> Any
keys = self._to_keys(key)
getitem = super().__getitem__
for k in keys + (
tuple(f(key) for f in self._keys) if self._keys else ()):
try:
return getitem(k)
except KeyError:
pass
try:
# support subclasses implementing __missing__
return self.__missing__(key)
except KeyError:
if len(keys) > 1:
raise KeyError(
'Key not found: {0!r} (with prefix: {0!r})'.format(*keys))
raise
def __setitem__(self, key, value):
# type: (str, Any) -> Any
self.changes[self._key(key)] = value
[docs] def first(self, *keys):
# type: (*str) -> Any
return first(None, (self.get(key) for key in keys))
[docs] def get(self, key, default=None):
# type: (str, Any) -> Any
try:
return self[key]
except KeyError:
return default
[docs] def clear(self):
# type: () -> None
"""Remove all changes, but keep defaults."""
self.changes.clear()
def __contains__(self, key):
# type: (str) -> bool
keys = self._to_keys(key)
return any(any(k in m for k in keys) for m in self.maps)
[docs] def swap_with(self, other):
# type: (ConfigurationView) -> None
changes = other.__dict__['changes']
defaults = other.__dict__['defaults']
self.__dict__.update(
changes=changes,
defaults=defaults,
key_t=other.__dict__['key_t'],
prefix=other.__dict__['prefix'],
maps=[changes] + defaults
)
[docs]class LimitedSet:
"""Kind-of Set (or priority queue) with limitations.
Good for when you need to test for membership (`a in set`),
but the set should not grow unbounded.
``maxlen`` is enforced at all times, so if the limit is reached
we'll also remove non-expired items.
You can also configure ``minlen``: this is the minimal residual size
of the set.
All arguments are optional, and no limits are enabled by default.
Arguments:
maxlen (int): Optional max number of items.
Adding more items than ``maxlen`` will result in immediate
removal of items sorted by oldest insertion time.
expires (float): TTL for all items.
Expired items are purged as keys are inserted.
minlen (int): Minimal residual size of this set.
.. versionadded:: 4.0
Value must be less than ``maxlen`` if both are configured.
Older expired items will be deleted, only after the set
exceeds ``minlen`` number of items.
data (Sequence): Initial data to initialize set with.
Can be an iterable of ``(key, value)`` pairs,
a dict (``{key: insertion_time}``), or another instance
of :class:`LimitedSet`.
Example:
>>> s = LimitedSet(maxlen=50000, expires=3600, minlen=4000)
>>> for i in range(60000):
... s.add(i)
... s.add(str(i))
...
>>> 57000 in s # last 50k inserted values are kept
True
>>> '10' in s # '10' did expire and was purged from set.
False
>>> len(s) # maxlen is reached
50000
>>> s.purge(now=time.monotonic() + 7200) # clock + 2 hours
>>> len(s) # now only minlen items are cached
4000
>>>> 57000 in s # even this item is gone now
False
"""
max_heap_percent_overload = 15
def __init__(self, maxlen=0, expires=0, data=None, minlen=0):
# type: (int, float, Mapping, int) -> None
self.maxlen = 0 if maxlen is None else maxlen
self.minlen = 0 if minlen is None else minlen
self.expires = 0 if expires is None else expires
self._data = {}
self._heap = []
if data:
# import items from data
self.update(data)
if not self.maxlen >= self.minlen >= 0:
raise ValueError(
'minlen must be a positive number, less or equal to maxlen.')
if self.expires < 0:
raise ValueError('expires cannot be negative!')
def _refresh_heap(self):
# type: () -> None
"""Time consuming recreating of heap. Don't run this too often."""
self._heap[:] = [entry for entry in self._data.values()]
heapify(self._heap)
def _maybe_refresh_heap(self):
# type: () -> None
if self._heap_overload >= self.max_heap_percent_overload:
self._refresh_heap()
[docs] def clear(self):
# type: () -> None
"""Clear all data, start from scratch again."""
self._data.clear()
self._heap[:] = []
[docs] def add(self, item, now=None):
# type: (Any, float) -> None
"""Add a new item, or reset the expiry time of an existing item."""
now = now or time.monotonic()
if item in self._data:
self.discard(item)
entry = (now, item)
self._data[item] = entry
heappush(self._heap, entry)
if self.maxlen and len(self._data) >= self.maxlen:
self.purge()
[docs] def update(self, other):
# type: (Iterable) -> None
"""Update this set from other LimitedSet, dict or iterable."""
if not other:
return
if isinstance(other, LimitedSet):
self._data.update(other._data)
self._refresh_heap()
self.purge()
elif isinstance(other, dict):
# revokes are sent as a dict
for key, inserted in other.items():
if isinstance(inserted, (tuple, list)):
# in case someone uses ._data directly for sending update
inserted = inserted[0]
if not isinstance(inserted, float):
raise ValueError(
'Expecting float timestamp, got type '
f'{type(inserted)!r} with value: {inserted}')
self.add(key, inserted)
else:
# XXX AVOID THIS, it could keep old data if more parties
# exchange them all over and over again
for obj in other:
self.add(obj)
[docs] def discard(self, item):
# type: (Any) -> None
# mark an existing item as removed. If KeyError is not found, pass.
self._data.pop(item, None)
self._maybe_refresh_heap()
pop_value = discard
[docs] def purge(self, now=None):
# type: (float) -> None
"""Check oldest items and remove them if needed.
Arguments:
now (float): Time of purging -- by default right now.
This can be useful for unit testing.
"""
now = now or time.monotonic()
now = now() if isinstance(now, Callable) else now
if self.maxlen:
while len(self._data) > self.maxlen:
self.pop()
# time based expiring:
if self.expires:
while len(self._data) > self.minlen >= 0:
inserted_time, _ = self._heap[0]
if inserted_time + self.expires > now:
break # oldest item hasn't expired yet
self.pop()
[docs] def pop(self, default=None):
# type: (Any) -> Any
"""Remove and return the oldest item, or :const:`None` when empty."""
while self._heap:
_, item = heappop(self._heap)
try:
self._data.pop(item)
except KeyError:
pass
else:
return item
return default
[docs] def as_dict(self):
# type: () -> Dict
"""Whole set as serializable dictionary.
Example:
>>> s = LimitedSet(maxlen=200)
>>> r = LimitedSet(maxlen=200)
>>> for i in range(500):
... s.add(i)
...
>>> r.update(s.as_dict())
>>> r == s
True
"""
return {key: inserted for inserted, key in self._data.values()}
def __eq__(self, other):
# type: (Any) -> bool
return self._data == other._data
def __ne__(self, other):
# type: (Any) -> bool
return not self.__eq__(other)
def __repr__(self):
# type: () -> str
return REPR_LIMITED_SET.format(
self, name=type(self).__name__, size=len(self),
)
def __iter__(self):
# type: () -> Iterable
return (i for _, i in sorted(self._data.values()))
def __len__(self):
# type: () -> int
return len(self._data)
def __contains__(self, key):
# type: (Any) -> bool
return key in self._data
def __reduce__(self):
# type: () -> Any
return self.__class__, (
self.maxlen, self.expires, self.as_dict(), self.minlen)
def __bool__(self):
# type: () -> bool
return bool(self._data)
__nonzero__ = __bool__ # Py2
@property
def _heap_overload(self):
# type: () -> float
"""Compute how much is heap bigger than data [percents]."""
return len(self._heap) * 100 / max(len(self._data), 1) - 100
MutableSet.register(LimitedSet)
[docs]class Evictable:
"""Mixin for classes supporting the ``evict`` method."""
Empty = Empty
[docs] def evict(self):
# type: () -> None
"""Force evict until maxsize is enforced."""
self._evict(range=count)
def _evict(self, limit=100, range=range):
# type: (int) -> None
try:
[self._evict1() for _ in range(limit)]
except IndexError:
pass
def _evict1(self):
# type: () -> None
if self._evictcount <= self.maxsize:
raise IndexError()
try:
self._pop_to_evict()
except self.Empty:
raise IndexError()
[docs]class Messagebuffer(Evictable):
"""A buffer of pending messages."""
Empty = Empty
def __init__(self, maxsize, iterable=None, deque=deque):
# type: (int, Iterable, Any) -> None
self.maxsize = maxsize
self.data = deque(iterable or [])
self._append = self.data.append
self._pop = self.data.popleft
self._len = self.data.__len__
self._extend = self.data.extend
[docs] def extend(self, it):
# type: (Iterable) -> None
self._extend(it)
self.maxsize and self._evict()
[docs] def take(self, *default):
# type: (*Any) -> Any
try:
return self._pop()
except IndexError:
if default:
return default[0]
raise self.Empty()
def _pop_to_evict(self):
# type: () -> None
return self.take()
def __repr__(self):
# type: () -> str
return f'<{type(self).__name__}: {len(self)}/{self.maxsize}>'
def __iter__(self):
# type: () -> Iterable
while 1:
try:
yield self._pop()
except IndexError:
break
def __len__(self):
# type: () -> int
return self._len()
def __contains__(self, item):
# type: () -> bool
return item in self.data
def __reversed__(self):
# type: () -> Iterable
return reversed(self.data)
def __getitem__(self, index):
# type: (Any) -> Any
return self.data[index]
@property
def _evictcount(self):
# type: () -> int
return len(self)
Sequence.register(Messagebuffer)
[docs]class BufferMap(OrderedDict, Evictable):
"""Map of buffers."""
Buffer = Messagebuffer
Empty = Empty
maxsize = None
total = 0
bufmaxsize = None
def __init__(self, maxsize, iterable=None, bufmaxsize=1000):
# type: (int, Iterable, int) -> None
super().__init__()
self.maxsize = maxsize
self.bufmaxsize = 1000
if iterable:
self.update(iterable)
self.total = sum(len(buf) for buf in self.items())
[docs] def put(self, key, item):
# type: (Any, Any) -> None
self._get_or_create_buffer(key).put(item)
self.total += 1
self.move_to_end(key) # least recently used.
self.maxsize and self._evict()
[docs] def extend(self, key, it):
# type: (Any, Iterable) -> None
self._get_or_create_buffer(key).extend(it)
self.total += len(it)
self.maxsize and self._evict()
[docs] def take(self, key, *default):
# type: (Any, *Any) -> Any
item, throw = None, False
try:
buf = self[key]
except KeyError:
throw = True
else:
try:
item = buf.take()
self.total -= 1
except self.Empty:
throw = True
else:
self.move_to_end(key) # mark as LRU
if throw:
if default:
return default[0]
raise self.Empty()
return item
def _get_or_create_buffer(self, key):
# type: (Any) -> Messagebuffer
try:
return self[key]
except KeyError:
buf = self[key] = self._new_buffer()
return buf
def _new_buffer(self):
# type: () -> Messagebuffer
return self.Buffer(maxsize=self.bufmaxsize)
def _LRUpop(self, *default):
# type: (*Any) -> Any
return self[self._LRUkey()].take(*default)
def _pop_to_evict(self):
# type: () -> None
for _ in range(100):
key = self._LRUkey()
buf = self[key]
try:
buf.take()
except (IndexError, self.Empty):
# buffer empty, remove it from mapping.
self.pop(key)
else:
# we removed one item
self.total -= 1
# if buffer is empty now, remove it from mapping.
if not len(buf):
self.pop(key)
else:
# move to least recently used.
self.move_to_end(key)
break
def __repr__(self):
# type: () -> str
return f'<{type(self).__name__}: {self.total}/{self.maxsize}>'
@property
def _evictcount(self):
# type: () -> int
return self.total