This document describes the current stable version of Celery (5.2). For development docs, go here.

Source code for celery.backends.consul

"""Consul result store backend.

- :class:`ConsulBackend` implements KeyValueStoreBackend to store results
    in the key-value store of Consul.
"""
from kombu.utils.encoding import bytes_to_str
from kombu.utils.url import parse_url

from celery.backends.base import KeyValueStoreBackend
from celery.exceptions import ImproperlyConfigured
from celery.utils.log import get_logger

try:
    import consul
except ImportError:
    consul = None

logger = get_logger(__name__)

__all__ = ('ConsulBackend',)

CONSUL_MISSING = """\
You need to install the python-consul library in order to use \
the Consul result store backend."""


[docs]class ConsulBackend(KeyValueStoreBackend): """Consul.io K/V store backend for Celery.""" consul = consul supports_autoexpire = True consistency = 'consistent' path = None def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) if self.consul is None: raise ImproperlyConfigured(CONSUL_MISSING) # # By default, for correctness, we use a client connection per # operation. If set, self.one_client will be used for all operations. # This provides for the original behaviour to be selected, and is # also convenient for mocking in the unit tests. # self.one_client = None self._init_from_params(**parse_url(self.url)) def _init_from_params(self, hostname, port, virtual_host, **params): logger.debug('Setting on Consul client to connect to %s:%d', hostname, port) self.path = virtual_host self.hostname = hostname self.port = port # # Optionally, allow a single client connection to be used to reduce # the connection load on Consul by adding a "one_client=1" parameter # to the URL. # if params.get('one_client', None): self.one_client = self.client()
[docs] def client(self): return self.one_client or consul.Consul(host=self.hostname, port=self.port, consistency=self.consistency)
def _key_to_consul_key(self, key): key = bytes_to_str(key) return key if self.path is None else f'{self.path}/{key}'
[docs] def get(self, key): key = self._key_to_consul_key(key) logger.debug('Trying to fetch key %s from Consul', key) try: _, data = self.client().kv.get(key) return data['Value'] except TypeError: pass
[docs] def mget(self, keys): for key in keys: yield self.get(key)
[docs] def set(self, key, value): """Set a key in Consul. Before creating the key it will create a session inside Consul where it creates a session with a TTL The key created afterwards will reference to the session's ID. If the session expires it will remove the key so that results can auto expire from the K/V store """ session_name = bytes_to_str(key) key = self._key_to_consul_key(key) logger.debug('Trying to create Consul session %s with TTL %d', session_name, self.expires) client = self.client() session_id = client.session.create(name=session_name, behavior='delete', ttl=self.expires) logger.debug('Created Consul session %s', session_id) logger.debug('Writing key %s to Consul', key) return client.kv.put(key=key, value=value, acquire=session_id)
[docs] def delete(self, key): key = self._key_to_consul_key(key) logger.debug('Removing key %s from Consul', key) return self.client().kv.delete(key)