This document is for Kombu's development version, which can be significantly different from previous releases. Get the stable docs here: 5.0.
Source code for kombu.transport.memory
"""In-memory transport module for Kombu.
Simple transport using memory for storing messages.
Messages can be passed only between threads.
Features
========
* Type: Virtual
* Supports Direct: Yes
* Supports Topic: Yes
* Supports Fanout: No
* Supports Priority: No
* Supports TTL: Yes
Connection String
=================
Connection string is in the following format:
.. code-block::
memory://
"""
from collections import defaultdict
from queue import Queue
from . import base, virtual
[docs]class Channel(virtual.Channel):
"""In-memory Channel."""
events = defaultdict(set)
queues = {}
do_restore = False
supports_fanout = True
def _has_queue(self, queue, **kwargs):
return queue in self.queues
def _new_queue(self, queue, **kwargs):
if queue not in self.queues:
self.queues[queue] = Queue()
def _get(self, queue, timeout=None):
return self._queue_for(queue).get(block=False)
def _queue_for(self, queue):
if queue not in self.queues:
self.queues[queue] = Queue()
return self.queues[queue]
def _queue_bind(self, *args):
pass
def _put_fanout(self, exchange, message, routing_key=None, **kwargs):
for queue in self._lookup(exchange, routing_key):
self._queue_for(queue).put(message)
def _put(self, queue, message, **kwargs):
self._queue_for(queue).put(message)
def _size(self, queue):
return self._queue_for(queue).qsize()
def _delete(self, queue, *args, **kwargs):
self.queues.pop(queue, None)
def _purge(self, queue):
q = self._queue_for(queue)
size = q.qsize()
q.queue.clear()
return size
[docs] def close(self):
super().close()
for queue in self.queues.values():
queue.empty()
self.queues = {}
[docs]class Transport(virtual.Transport):
"""In-memory Transport."""
Channel = Channel
#: memory backend state is global.
global_state = virtual.BrokerState()
implements = base.Transport.implements
driver_type = 'memory'
driver_name = 'memory'
def __init__(self, client, **kwargs):
super().__init__(client, **kwargs)
self.state = self.global_state