This document describes the current stable version of Celery (5.2). For development docs, go here.

Source code for celery.backends.cassandra

"""Apache Cassandra result store backend using the DataStax driver."""
import threading

from celery import states
from celery.exceptions import ImproperlyConfigured
from celery.utils.log import get_logger

from .base import BaseBackend

try:  # pragma: no cover
    import cassandra
    import cassandra.auth
    import cassandra.cluster
    import cassandra.query
except ImportError:  # pragma: no cover
    cassandra = None


__all__ = ('CassandraBackend',)

logger = get_logger(__name__)

E_NO_CASSANDRA = """
You need to install the cassandra-driver library to
use the Cassandra backend.  See https://github.com/datastax/python-driver
"""

E_NO_SUCH_CASSANDRA_AUTH_PROVIDER = """
CASSANDRA_AUTH_PROVIDER you provided is not a valid auth_provider class.
See https://datastax.github.io/python-driver/api/cassandra/auth.html.
"""

Q_INSERT_RESULT = """
INSERT INTO {table} (
    task_id, status, result, date_done, traceback, children) VALUES (
        %s, %s, %s, %s, %s, %s) {expires};
"""

Q_SELECT_RESULT = """
SELECT status, result, date_done, traceback, children
FROM {table}
WHERE task_id=%s
LIMIT 1
"""

Q_CREATE_RESULT_TABLE = """
CREATE TABLE {table} (
    task_id text,
    status text,
    result blob,
    date_done timestamp,
    traceback blob,
    children blob,
    PRIMARY KEY ((task_id), date_done)
) WITH CLUSTERING ORDER BY (date_done DESC);
"""

Q_EXPIRES = """
    USING TTL {0}
"""


def buf_t(x):
    return bytes(x, 'utf8')


[docs]class CassandraBackend(BaseBackend): """Cassandra backend utilizing DataStax driver. Raises: celery.exceptions.ImproperlyConfigured: if module :pypi:`cassandra-driver` is not available, or if the :setting:`cassandra_servers` setting is not set. """ #: List of Cassandra servers with format: ``hostname``. servers = None supports_autoexpire = True # autoexpire supported via entry_ttl def __init__(self, servers=None, keyspace=None, table=None, entry_ttl=None, port=9042, **kwargs): super().__init__(**kwargs) if not cassandra: raise ImproperlyConfigured(E_NO_CASSANDRA) conf = self.app.conf self.servers = servers or conf.get('cassandra_servers', None) self.port = port or conf.get('cassandra_port', None) self.keyspace = keyspace or conf.get('cassandra_keyspace', None) self.table = table or conf.get('cassandra_table', None) self.cassandra_options = conf.get('cassandra_options', {}) if not self.servers or not self.keyspace or not self.table: raise ImproperlyConfigured('Cassandra backend not configured.') expires = entry_ttl or conf.get('cassandra_entry_ttl', None) self.cqlexpires = ( Q_EXPIRES.format(expires) if expires is not None else '') read_cons = conf.get('cassandra_read_consistency') or 'LOCAL_QUORUM' write_cons = conf.get('cassandra_write_consistency') or 'LOCAL_QUORUM' self.read_consistency = getattr( cassandra.ConsistencyLevel, read_cons, cassandra.ConsistencyLevel.LOCAL_QUORUM) self.write_consistency = getattr( cassandra.ConsistencyLevel, write_cons, cassandra.ConsistencyLevel.LOCAL_QUORUM) self.auth_provider = None auth_provider = conf.get('cassandra_auth_provider', None) auth_kwargs = conf.get('cassandra_auth_kwargs', None) if auth_provider and auth_kwargs: auth_provider_class = getattr(cassandra.auth, auth_provider, None) if not auth_provider_class: raise ImproperlyConfigured(E_NO_SUCH_CASSANDRA_AUTH_PROVIDER) self.auth_provider = auth_provider_class(**auth_kwargs) self._cluster = None self._session = None self._write_stmt = None self._read_stmt = None self._lock = threading.RLock() def _get_connection(self, write=False): """Prepare the connection for action. Arguments: write (bool): are we a writer? """ if self._session is not None: return self._lock.acquire() try: if self._session is not None: return self._cluster = cassandra.cluster.Cluster( self.servers, port=self.port, auth_provider=self.auth_provider, **self.cassandra_options) self._session = self._cluster.connect(self.keyspace) # We're forced to do concatenation below, as formatting would # blow up on superficial %s that'll be processed by Cassandra self._write_stmt = cassandra.query.SimpleStatement( Q_INSERT_RESULT.format( table=self.table, expires=self.cqlexpires), ) self._write_stmt.consistency_level = self.write_consistency self._read_stmt = cassandra.query.SimpleStatement( Q_SELECT_RESULT.format(table=self.table), ) self._read_stmt.consistency_level = self.read_consistency if write: # Only possible writers "workers" are allowed to issue # CREATE TABLE. This is to prevent conflicting situations # where both task-creator and task-executor would issue it # at the same time. # Anyway; if you're doing anything critical, you should # have created this table in advance, in which case # this query will be a no-op (AlreadyExists) make_stmt = cassandra.query.SimpleStatement( Q_CREATE_RESULT_TABLE.format(table=self.table), ) make_stmt.consistency_level = self.write_consistency try: self._session.execute(make_stmt) except cassandra.AlreadyExists: pass except cassandra.OperationTimedOut: # a heavily loaded or gone Cassandra cluster failed to respond. # leave this class in a consistent state if self._cluster is not None: self._cluster.shutdown() # also shuts down _session self._cluster = None self._session = None raise # we did fail after all - reraise finally: self._lock.release() def _store_result(self, task_id, result, state, traceback=None, request=None, **kwargs): """Store return value and state of an executed task.""" self._get_connection(write=True) self._session.execute(self._write_stmt, ( task_id, state, buf_t(self.encode(result)), self.app.now(), buf_t(self.encode(traceback)), buf_t(self.encode(self.current_task_children(request))) ))
[docs] def as_uri(self, include_password=True): return 'cassandra://'
def _get_task_meta_for(self, task_id): """Get task meta-data for a task by id.""" self._get_connection() res = self._session.execute(self._read_stmt, (task_id, )).one() if not res: return {'status': states.PENDING, 'result': None} status, result, date_done, traceback, children = res return self.meta_from_decoded({ 'task_id': task_id, 'status': status, 'result': self.decode(result), 'date_done': date_done, 'traceback': self.decode(traceback), 'children': self.decode(children), }) def __reduce__(self, args=(), kwargs=None): kwargs = {} if not kwargs else kwargs kwargs.update( {'servers': self.servers, 'keyspace': self.keyspace, 'table': self.table}) return super().__reduce__(args, kwargs)