This document is for Kombu's development version, which can be significantly different from previous releases. Get the stable docs here: 5.0.
Source code for kombu.transport.sqlalchemy.models
"""Kombu transport using SQLAlchemy as the message store."""
import datetime
from sqlalchemy import (Boolean, Column, DateTime, ForeignKey, Index, Integer,
Sequence, SmallInteger, String, Text)
from sqlalchemy.orm import relation
from sqlalchemy.schema import MetaData
try:
from sqlalchemy.orm import declarative_base, declared_attr
except ImportError:
# TODO: Remove this once we drop support for SQLAlchemy < 1.4.
from sqlalchemy.ext.declarative import declarative_base, declared_attr
class_registry = {}
metadata = MetaData()
ModelBase = declarative_base(metadata=metadata, class_registry=class_registry)
[docs]class Queue:
"""The queue class."""
__table_args__ = {'sqlite_autoincrement': True, 'mysql_engine': 'InnoDB'}
id = Column(Integer, Sequence('queue_id_sequence'), primary_key=True,
autoincrement=True)
name = Column(String(200), unique=True)
def __init__(self, name):
self.name = name
def __str__(self):
return f'<Queue({self.name})>'
@declared_attr
def messages(cls):
return relation('Message', backref='queue', lazy='noload')
[docs]class Message:
"""The message class."""
__table_args__ = (
Index('ix_kombu_message_timestamp_id', 'timestamp', 'id'),
{'sqlite_autoincrement': True, 'mysql_engine': 'InnoDB'}
)
id = Column(Integer, Sequence('message_id_sequence'),
primary_key=True, autoincrement=True)
visible = Column(Boolean, default=True, index=True)
sent_at = Column('timestamp', DateTime, nullable=True, index=True,
onupdate=datetime.datetime.now)
payload = Column(Text, nullable=False)
version = Column(SmallInteger, nullable=False, default=1)
__mapper_args__ = {'version_id_col': version}
def __init__(self, payload, queue):
self.payload = payload
self.queue = queue
def __str__(self):
return '<Message: {0.sent_at} {0.payload} {0.queue_id}>'.format(self)
@declared_attr
def queue_id(self):
return Column(
Integer,
ForeignKey(
'%s.id' % class_registry['Queue'].__tablename__,
name='FK_kombu_message_queue'
)
)