Queue Support

The queue support module makes it possible to add log records to a queue system. This is useful for distributed setups where you want multiple processes to log to the same backend. Currently supported are ZeroMQ as well as the multiprocessing Queue class.

ZeroMQ

class logbook.queues.ZeroMQHandler(uri=None, level=0, filter=None, bubble=False, context=None, multi=False)

A handler that acts as a ZeroMQ publisher, which publishes each record as json dump. Requires the pyzmq library.

The queue will be filled with JSON exported log records. To receive such log records from a queue you can use the ZeroMQSubscriber.

If multi is set to True, the handler will use a PUSH socket to publish the records. This allows multiple handlers to use the same uri. The records can be received by using the ZeroMQSubscriber with multi set to True.

Example setup:

handler = ZeroMQHandler('tcp://127.0.0.1:5000')
close(linger=-1)

Tidy up any resources used by the handler. This is automatically called by the destructor of the class as well, but explicit calls are encouraged. Make sure that multiple calls to close are possible.

context

the zero mq context

emit(record)

Emit the specified logging record. This should take the record and deliver it to whereever the handler sends formatted log records.

export_record(record)

Exports the record into a dictionary ready for JSON dumping.

socket

the zero mq socket.

class logbook.queues.ZeroMQSubscriber(uri=None, context=None, multi=False)

A helper that acts as ZeroMQ subscriber and will dispatch received log records to the active handler setup. There are multiple ways to use this class.

It can be used to receive log records from a queue:

subscriber = ZeroMQSubscriber('tcp://127.0.0.1:5000')
record = subscriber.recv()

But it can also be used to receive and dispatch these in one go:

with target_handler:
    subscriber = ZeroMQSubscriber('tcp://127.0.0.1:5000')
    subscriber.dispatch_forever()

This will take all the log records from that queue and dispatch them over to target_handler. If you want you can also do that in the background:

subscriber = ZeroMQSubscriber('tcp://127.0.0.1:5000')
controller = subscriber.dispatch_in_background(target_handler)

The controller returned can be used to shut down the background thread:

controller.stop()

If multi is set to True, the subscriber will use a PULL socket and listen to records published by a PUSH socket (usually via a ZeroMQHandler with multi set to True). This allows a single subscriber to dispatch multiple handlers.

close()

Closes the zero mq socket.

context

the zero mq context

dispatch_forever()

Starts a loop that dispatches log records forever.

dispatch_in_background(setup=None)

Starts a new daemonized thread that dispatches in the background. An optional handler setup can be provided that pushed to the new thread (can be any logbook.base.StackedObject).

Returns a ThreadController object for shutting down the background thread. The background thread will already be running when this function returns.

dispatch_once(timeout=None)

Receives one record from the socket, loads it and dispatches it. Returns True if something was dispatched or False if it timed out.

recv(timeout=None)

Receives a single record from the socket. Timeout of 0 means nonblocking, None means blocking and otherwise it’s a timeout in seconds after which the function just returns with None.

socket

the zero mq socket.

AMQP Message Queues

class logbook.queues.MessageQueueHandler(uri=None, queue='logging', level=0, filter=None, bubble=False)

A handler that acts as a message queue publisher, which publishes each record as json dump. Requires the kombu module.

The queue will be filled with JSON exported log records. To receive such log records from a queue you can use the MessageQueueSubscriber.

For an AMQP backend such as RabbitMQ:

handler = MessageQueueHandler('amqp://guest:guest@localhost//')

This requires the py-amqp or the librabbitmq client library.

For Redis (requires redis client library):

handler = MessageQueueHandler('redis://localhost:8889/0')

For MongoDB (requires pymongo):

handler = MessageQueueHandler('mongodb://localhost:27017/logging')

Several other backends are also supported. Refer to the kombu documentation

close()

Tidy up any resources used by the handler. This is automatically called by the destructor of the class as well, but explicit calls are encouraged. Make sure that multiple calls to close are possible.

emit(record)

Emit the specified logging record. This should take the record and deliver it to whereever the handler sends formatted log records.

export_record(record)

Exports the record into a dictionary ready for JSON dumping.

class logbook.queues.MessageQueueSubscriber(uri=None, queue='logging')

A helper that acts as a message queue subscriber and will dispatch received log records to the active handler setup. There are multiple ways to use this class.

It can be used to receive log records from a queue:

subscriber = MessageQueueSubscriber('mongodb://localhost:27017/logging')
record = subscriber.recv()

But it can also be used to receive and dispatch these in one go:

with target_handler:
    subscriber = MessageQueueSubscriber('mongodb://localhost:27017/logging')
    subscriber.dispatch_forever()

This will take all the log records from that queue and dispatch them over to target_handler. If you want you can also do that in the background:

subscriber = MessageQueueSubscriber('mongodb://localhost:27017/logging')
controller = subscriber.dispatch_in_background(target_handler)

The controller returned can be used to shut down the background thread:

controller.stop()
recv(timeout=None)

Receives a single record from the socket. Timeout of 0 means nonblocking, None means blocking and otherwise it’s a timeout in seconds after which the function just returns with None.

MultiProcessing

class logbook.queues.MultiProcessingHandler(queue, level=0, filter=None, bubble=False)

Implements a handler that dispatches over a queue to a different process. It is connected to a subscriber with a multiprocessing.Queue:

from multiprocessing import Queue
from logbook.queues import MultiProcessingHandler
queue = Queue(-1)
handler = MultiProcessingHandler(queue)
emit(record)

Emit the specified logging record. This should take the record and deliver it to whereever the handler sends formatted log records.

class logbook.queues.MultiProcessingSubscriber(queue=None)

Receives log records from the given multiprocessing queue and dispatches them to the active handler setup. Make sure to use the same queue for both handler and subscriber. Idaelly the queue is set up with maximum size (-1):

from multiprocessing import Queue
queue = Queue(-1)

It can be used to receive log records from a queue:

subscriber = MultiProcessingSubscriber(queue)
record = subscriber.recv()

But it can also be used to receive and dispatch these in one go:

with target_handler:
    subscriber = MultiProcessingSubscriber(queue)
    subscriber.dispatch_forever()

This will take all the log records from that queue and dispatch them over to target_handler. If you want you can also do that in the background:

subscriber = MultiProcessingSubscriber(queue)
controller = subscriber.dispatch_in_background(target_handler)

The controller returned can be used to shut down the background thread:

controller.stop()

If no queue is provided the subscriber will create one. This one can the be used by handlers:

subscriber = MultiProcessingSubscriber()
handler = MultiProcessingHandler(subscriber.queue)
dispatch_forever()

Starts a loop that dispatches log records forever.

dispatch_in_background(setup=None)

Starts a new daemonized thread that dispatches in the background. An optional handler setup can be provided that pushed to the new thread (can be any logbook.base.StackedObject).

Returns a ThreadController object for shutting down the background thread. The background thread will already be running when this function returns.

dispatch_once(timeout=None)

Receives one record from the socket, loads it and dispatches it. Returns True if something was dispatched or False if it timed out.

recv(timeout=None)

Receives a single record from the socket. Timeout of 0 means nonblocking, None means blocking and otherwise it’s a timeout in seconds after which the function just returns with None.

Subclasses have to override this.

Other

class logbook.queues.ThreadedWrapperHandler(handler, maxsize=0)

This handled uses a single background thread to dispatch log records to a specific other handler using an internal queue. The idea is that if you are using a handler that requires some time to hand off the log records (such as the mail handler) and would block your request, you can let Logbook do that in a background thread.

The threaded wrapper handler will automatically adopt the methods and properties of the wrapped handler. All the values will be reflected:

>>> twh = ThreadedWrapperHandler(TestHandler())
>>> from logbook import WARNING
>>> twh.level_name = 'WARNING'
>>> twh.handler.level_name
'WARNING'
close()

Tidy up any resources used by the handler. This is automatically called by the destructor of the class as well, but explicit calls are encouraged. Make sure that multiple calls to close are possible.

emit(record)

Emit the specified logging record. This should take the record and deliver it to whereever the handler sends formatted log records.

emit_batch(records, reason)

Some handlers may internally queue up records and want to forward them at once to another handler. For example the FingersCrossedHandler internally buffers records until a level threshold is reached in which case the buffer is sent to this method and not emit() for each record.

The default behaviour is to call emit() for each record in the buffer, but handlers can use this to optimize log handling. For instance the mail handler will try to batch up items into one mail and not to emit mails for each record in the buffer.

Note that unlike emit() there is no wrapper method like handle() that does error handling. The reason is that this is intended to be used by other handlers which are already protected against internal breakage.

reason is a string that specifies the rason why emit_batch() was called, and not emit(). The following are valid values:

'buffer'

Records were buffered for performance reasons or because the records were sent to another process and buffering was the only possible way. For most handlers this should be equivalent to calling emit() for each record.

'escalation'

Escalation means that records were buffered in case the threshold was exceeded. In this case, the last record in the iterable is the record that triggered the call.

'group'

All the records in the iterable belong to the same logical component and happened in the same process. For example there was a long running computation and the handler is invoked with a bunch of records that happened there. This is similar to the escalation reason, just that the first one is the significant one, not the last.

If a subclass overrides this and does not want to handle a specific reason it must call into the superclass because more reasons might appear in future releases.

Example implementation:

def emit_batch(self, records, reason):
    if reason not in ('escalation', 'group'):
        Handler.emit_batch(self, records, reason)
    ...
class logbook.queues.SubscriberGroup(subscribers=None, queue_limit=10)

This is a subscriber which represents a group of subscribers.

This is helpful if you are writing a server-like application which has “slaves”. This way a user is easily able to view every log record which happened somewhere in the entire system without having to check every single slave:

subscribers = SubscriberGroup([
    MultiProcessingSubscriber(queue),
    ZeroMQSubscriber('tcp://127.0.0.1:5000')
])
with target_handler:
    subscribers.dispatch_forever()
add(subscriber)

Adds the given subscriber to the group.

recv(timeout=None)

Receives a single record from the socket. Timeout of 0 means nonblocking, None means blocking and otherwise it’s a timeout in seconds after which the function just returns with None.

Subclasses have to override this.

stop()

Stops the group from internally recieving any more messages, once the internal queue is exhausted recv() will always return None.

Base Interface

class logbook.queues.SubscriberBase

Baseclass for all subscribers.

dispatch_forever()

Starts a loop that dispatches log records forever.

dispatch_in_background(setup=None)

Starts a new daemonized thread that dispatches in the background. An optional handler setup can be provided that pushed to the new thread (can be any logbook.base.StackedObject).

Returns a ThreadController object for shutting down the background thread. The background thread will already be running when this function returns.

dispatch_once(timeout=None)

Receives one record from the socket, loads it and dispatches it. Returns True if something was dispatched or False if it timed out.

recv(timeout=None)

Receives a single record from the socket. Timeout of 0 means nonblocking, None means blocking and otherwise it’s a timeout in seconds after which the function just returns with None.

Subclasses have to override this.

class logbook.queues.ThreadController(subscriber, setup=None)

A helper class used by queue subscribers to control the background thread. This is usually created and started in one go by dispatch_in_background() or a comparable function.

start()

Starts the task thread.

stop()

Stops the task thread.

class logbook.queues.TWHThreadController(wrapper_handler)

A very basic thread controller that pulls things in from a queue and sends it to a handler. Both queue and handler are taken from the passed ThreadedWrapperHandler.

start()

Starts the task thread.

stop()

Stops the task thread.