This document is for Kombu's development version, which can be significantly different from previous releases. Get the stable docs here: 5.0.
Source code for kombu.transport.sqlalchemy
"""SQLAlchemy Transport module for kombu.
Kombu transport using SQL Database as the message store.
Features
========
* Type: Virtual
* Supports Direct: yes
* Supports Topic: yes
* Supports Fanout: no
* Supports Priority: no
* Supports TTL: no
Connection String
=================
.. code-block::
sqla+SQL_ALCHEMY_CONNECTION_STRING
sqlalchemy+SQL_ALCHEMY_CONNECTION_STRING
For details about ``SQL_ALCHEMY_CONNECTION_STRING`` see SQLAlchemy Engine Configuration documentation.
Examples:
.. code-block::
# PostgreSQL with default driver
sqla+postgresql://scott:tiger@localhost/mydatabase
# PostgreSQL with psycopg2 driver
sqla+postgresql+psycopg2://scott:tiger@localhost/mydatabase
# PostgreSQL with pg8000 driver
sqla+postgresql+pg8000://scott:tiger@localhost/mydatabase
# MySQL with default driver
sqla+mysql://scott:tiger@localhost/foo
# MySQL with mysqlclient driver (a maintained fork of MySQL-Python)
sqla+mysql+mysqldb://scott:tiger@localhost/foo
# MySQL with PyMySQL driver
sqla+mysql+pymysql://scott:tiger@localhost/foo
Transport Options
=================
* ``queue_tablename``: Name of table storing queues.
* ``message_tablename``: Name of table storing messages.
Moreover parameters of :func:`sqlalchemy.create_engine()` function can be passed as transport options.
"""
# SQLAlchemy overrides != False to have special meaning and pep8 complains
# flake8: noqa
import threading
from json import dumps, loads
from queue import Empty
from sqlalchemy import create_engine
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm import sessionmaker
from kombu.transport import virtual
from kombu.utils import cached_property
from kombu.utils.encoding import bytes_to_str
from .models import Message as MessageBase
from .models import ModelBase
from .models import Queue as QueueBase
from .models import class_registry, metadata
VERSION = (1, 4, 1)
__version__ = '.'.join(map(str, VERSION))
_MUTEX = threading.RLock()
[docs]class Channel(virtual.Channel):
"""The channel class."""
_session = None
_engines = {} # engine cache
def __init__(self, connection, **kwargs):
self._configure_entity_tablenames(connection.client.transport_options)
super().__init__(connection, **kwargs)
def _configure_entity_tablenames(self, opts):
self.queue_tablename = opts.get('queue_tablename', 'kombu_queue')
self.message_tablename = opts.get('message_tablename', 'kombu_message')
#
# Define the model definitions. This registers the declarative
# classes with the active SQLAlchemy metadata object. This *must* be
# done prior to the ``create_engine`` call.
#
self.queue_cls and self.message_cls
def _engine_from_config(self):
conninfo = self.connection.client
transport_options = conninfo.transport_options.copy()
transport_options.pop('queue_tablename', None)
transport_options.pop('message_tablename', None)
return create_engine(conninfo.hostname, **transport_options)
def _open(self):
conninfo = self.connection.client
if conninfo.hostname not in self._engines:
with _MUTEX:
if conninfo.hostname in self._engines:
# Engine was created while we were waiting to
# acquire the lock.
return self._engines[conninfo.hostname]
engine = self._engine_from_config()
Session = sessionmaker(bind=engine)
metadata.create_all(engine)
self._engines[conninfo.hostname] = engine, Session
return self._engines[conninfo.hostname]
@property
def session(self):
if self._session is None:
_, Session = self._open()
self._session = Session()
return self._session
def _get_or_create(self, queue):
obj = self.session.query(self.queue_cls) \
.filter(self.queue_cls.name == queue).first()
if not obj:
with _MUTEX:
obj = self.session.query(self.queue_cls) \
.filter(self.queue_cls.name == queue).first()
if obj:
# Queue was created while we were waiting to
# acquire the lock.
return obj
obj = self.queue_cls(queue)
self.session.add(obj)
try:
self.session.commit()
except OperationalError:
self.session.rollback()
return obj
def _new_queue(self, queue, **kwargs):
self._get_or_create(queue)
def _put(self, queue, payload, **kwargs):
obj = self._get_or_create(queue)
message = self.message_cls(dumps(payload), obj)
self.session.add(message)
try:
self.session.commit()
except OperationalError:
self.session.rollback()
def _get(self, queue):
obj = self._get_or_create(queue)
if self.session.bind.name == 'sqlite':
self.session.execute('BEGIN IMMEDIATE TRANSACTION')
try:
msg = self.session.query(self.message_cls) \
.with_for_update() \
.filter(self.message_cls.queue_id == obj.id) \
.filter(self.message_cls.visible != False) \
.order_by(self.message_cls.sent_at) \
.order_by(self.message_cls.id) \
.limit(1) \
.first()
if msg:
msg.visible = False
return loads(bytes_to_str(msg.payload))
raise Empty()
finally:
self.session.commit()
def _query_all(self, queue):
obj = self._get_or_create(queue)
return self.session.query(self.message_cls) \
.filter(self.message_cls.queue_id == obj.id)
def _purge(self, queue):
count = self._query_all(queue).delete(synchronize_session=False)
try:
self.session.commit()
except OperationalError:
self.session.rollback()
return count
def _size(self, queue):
return self._query_all(queue).count()
def _declarative_cls(self, name, base, ns):
if name not in class_registry:
with _MUTEX:
if name in class_registry:
# Class was registered while we were waiting to
# acquire the lock.
return class_registry[name]
return type(str(name), (base, ModelBase), ns)
return class_registry[name]
@cached_property
def queue_cls(self):
return self._declarative_cls(
'Queue',
QueueBase,
{'__tablename__': self.queue_tablename}
)
@cached_property
def message_cls(self):
return self._declarative_cls(
'Message',
MessageBase,
{'__tablename__': self.message_tablename}
)