This document is for Kombu's development version, which can be significantly different from previous releases. Get the stable docs here: 5.0.

Source code for kombu.abstract

"""Object utilities."""

from copy import copy

from .connection import maybe_channel
from .exceptions import NotBoundError
from .utils.functional import ChannelPromise

__all__ = ('Object', 'MaybeChannelBound')


def unpickle_dict(cls, kwargs):
    return cls(**kwargs)


def _any(v):
    return v


class Object:
    """Common base class.

    Supports automatic kwargs->attributes handling, and cloning.
    """

    attrs = ()

    def __init__(self, *args, **kwargs):
        for name, type_ in self.attrs:
            value = kwargs.get(name)
            if value is not None:
                setattr(self, name, (type_ or _any)(value))
            else:
                try:
                    getattr(self, name)
                except AttributeError:
                    setattr(self, name, None)

    def as_dict(self, recurse=False):
        def f(obj, type):
            if recurse and isinstance(obj, Object):
                return obj.as_dict(recurse=True)
            return type(obj) if type and obj is not None else obj
        return {
            attr: f(getattr(self, attr), type) for attr, type in self.attrs
        }

    def __reduce__(self):
        return unpickle_dict, (self.__class__, self.as_dict())

    def __copy__(self):
        return self.__class__(**self.as_dict())


[docs]class MaybeChannelBound(Object): """Mixin for classes that can be bound to an AMQP channel.""" _channel = None _is_bound = False #: Defines whether maybe_declare can skip declaring this entity twice. can_cache_declaration = False def __call__(self, channel): """`self(channel) -> self.bind(channel)`.""" return self.bind(channel)
[docs] def bind(self, channel): """Create copy of the instance that is bound to a channel.""" return copy(self).maybe_bind(channel)
[docs] def maybe_bind(self, channel): """Bind instance to channel if not already bound.""" if not self.is_bound and channel: self._channel = maybe_channel(channel) self.when_bound() self._is_bound = True return self
[docs] def revive(self, channel): """Revive channel after the connection has been re-established. Used by :meth:`~kombu.Connection.ensure`. """ if self.is_bound: self._channel = channel self.when_bound()
[docs] def when_bound(self): """Callback called when the class is bound."""
def __repr__(self): return self._repr_entity(type(self).__name__) def _repr_entity(self, item=''): item = item or type(self).__name__ if self.is_bound: return '<{} bound to chan:{}>'.format( item or type(self).__name__, self.channel.channel_id) return f'<unbound {item}>' @property def is_bound(self): """Flag set if the channel is bound.""" return self._is_bound and self._channel is not None @property def channel(self): """Current channel if the object is bound.""" channel = self._channel if channel is None: raise NotBoundError( "Can't call method on {} not bound to a channel".format( type(self).__name__)) if isinstance(channel, ChannelPromise): channel = self._channel = channel() return channel