This document is for Kombu's development version, which can be significantly different from previous releases. Get the stable docs here: 5.0.
Source code for kombu.transport.pyro
"""Pyro transport module for kombu.
Pyro transport, and Kombu Broker daemon.
Requires the :mod:`Pyro4` library to be installed.
Features
========
* Type: Virtual
* Supports Direct: Yes
* Supports Topic: Yes
* Supports Fanout: No
* Supports Priority: No
* Supports TTL: No
Connection String
=================
To use the Pyro transport with Kombu, use an url of the form:
.. code-block::
pyro://localhost/kombu.broker
The hostname is where the transport will be looking for a Pyro name server,
which is used in turn to locate the kombu.broker Pyro service.
This broker can be launched by simply executing this transport module directly,
with the command: ``python -m kombu.transport.pyro``
Transport Options
=================
"""
import sys
from queue import Empty, Queue
from kombu.exceptions import reraise
from kombu.log import get_logger
from kombu.utils.objects import cached_property
from . import virtual
try:
import Pyro4 as pyro
from Pyro4.errors import NamingError
from Pyro4.util import SerializerBase
except ImportError: # pragma: no cover
pyro = NamingError = SerializerBase = None
DEFAULT_PORT = 9090
E_NAMESERVER = """\
Unable to locate pyro nameserver on host {0.hostname}\
"""
E_LOOKUP = """\
Unable to lookup '{0.virtual_host}' in pyro nameserver on host {0.hostname}\
"""
logger = get_logger(__name__)
[docs]class Channel(virtual.Channel):
"""Pyro Channel."""
def _new_queue(self, queue, **kwargs):
if queue not in self.queues():
self.shared_queues.new_queue(queue)
def _has_queue(self, queue, **kwargs):
return self.shared_queues.has_queue(queue)
def _get(self, queue, timeout=None):
queue = self._queue_for(queue)
return self.shared_queues.get(queue)
def _queue_for(self, queue):
if queue not in self.queues():
self.shared_queues.new_queue(queue)
return queue
def _put(self, queue, message, **kwargs):
queue = self._queue_for(queue)
self.shared_queues.put(queue, message)
def _size(self, queue):
return self.shared_queues.size(queue)
def _delete(self, queue, *args, **kwargs):
self.shared_queues.delete(queue)
def _purge(self, queue):
return self.shared_queues.purge(queue)
@cached_property
def shared_queues(self):
return self.connection.shared_queues
[docs]class Transport(virtual.Transport):
"""Pyro Transport."""
Channel = Channel
#: memory backend state is global.
# TODO: To be checked whether state can be per-Transport
global_state = virtual.BrokerState()
default_port = DEFAULT_PORT
driver_type = driver_name = 'pyro'
def __init__(self, client, **kwargs):
super().__init__(client, **kwargs)
self.state = self.global_state
def _open(self):
logger.debug("trying Pyro nameserver to find the broker daemon")
conninfo = self.client
try:
nameserver = pyro.locateNS(host=conninfo.hostname,
port=self.default_port)
except NamingError:
reraise(NamingError, NamingError(E_NAMESERVER.format(conninfo)),
sys.exc_info()[2])
try:
# name of registered pyro object
uri = nameserver.lookup(conninfo.virtual_host)
return pyro.Proxy(uri)
except NamingError:
reraise(NamingError, NamingError(E_LOOKUP.format(conninfo)),
sys.exc_info()[2])
@cached_property
def shared_queues(self):
return self._open()
if pyro is not None:
SerializerBase.register_dict_to_class("queue.Empty",
lambda cls, data: Empty())
[docs] @pyro.expose
@pyro.behavior(instance_mode="single")
class KombuBroker:
"""Kombu Broker used by the Pyro transport.
You have to run this as a separate (Pyro) service.
"""
def __init__(self):
self.queues = {}
def get_queue_names(self):
return list(self.queues)
def new_queue(self, queue):
if queue in self.queues:
return # silently ignore the fact that queue already exists
self.queues[queue] = Queue()
def has_queue(self, queue):
return queue in self.queues
def get(self, queue):
return self.queues[queue].get(block=False)
def put(self, queue, message):
self.queues[queue].put(message)
def size(self, queue):
return self.queues[queue].qsize()
def delete(self, queue):
del self.queues[queue]
def purge(self, queue):
while True:
try:
self.queues[queue].get(blocking=False)
except Empty:
break
# launch a Kombu Broker daemon with the command:
# ``python -m kombu.transport.pyro``
if __name__ == "__main__":
print("Launching Broker for Kombu's Pyro transport.")
with pyro.Daemon() as daemon:
print("(Expecting a Pyro name server at {}:{})"
.format(pyro.config.NS_HOST, pyro.config.NS_PORT))
with pyro.locateNS() as ns:
print("You can connect with Kombu using the url "
"'pyro://{}/kombu.broker'".format(pyro.config.NS_HOST))
uri = daemon.register(KombuBroker)
ns.register("kombu.broker", uri)
daemon.requestLoop()