In various situations it is needed that the servers and the clients are decoupled. In abstract terms this means
that information producers do not know nor care about the parties that are interested in the information, and the
information consumers do not know nor care about the source or sources of the information. All they know is that they
produce or consume information on a certain subject.
Here does the Event Server fit in nicely. It is a third party that controls the flow of information
about certain subjects ("events"). A publisher uses the Event Server to publish
a message on a specific subject. A
subscriber uses the Event Server to subscribe itself to specific subjects, or to a pattern
that matches certain subjects. As soon as new information on a subject is produced (an "event" occurs)
all subscribers for this subject receive the information. Nobody knows (and cares) about anybody else.
It is important to rembember that all events processed by the ES are transient, which means they are not stored.
If there is no listener, all events disappear in the void. The store-and-forward programming model is part of a
messaging service, which is not what the ES is meant to do. It is also important to know that all subscription data
is transient. Once the ES is stopped, all subscriptions are lost. The clients that are subscribed are not notified of
this! If no care is taken, they keep on waiting forever for events to occur, because the ES doesn't know about them
anymore!
Usually your subscribers will receive the events in the order they are published. However, this is not guaranteed. If you rely on the exact order of receiving events, you must add some logic to check this (possibly by examining the event's timestamps). The chance of events not arriving in the order they were published is very, very small in a high-performance LAN. Only on very high server load, high network traffic, or a high-latency (WAN?) connection it is likely to occur.
Another thing to pay attention to is that the ES does not guarantee delivery of events. As mentioned above, the ES does not have a store-and-forward mechanism, but even if everything is up and running, the ES does not enhance Pyro's way of transporting messages. This means that it's still possible (perhaps due to a network error) that an event gets lost. For reliable, guaranteed, asynchronous message delivery you'll have to look somewhere else, sorry ;-)
The ES is a multithreaded server and will not work if your Python installation doesn't have thread support.
Publications are dispatched to the subscribers in different threads, so they don't block eachother. Please note that
events may arrive at your listener in multithreaded fashion! Pyro itself starts another thread in your listener to
handle the new event, possibly while the previous one is still being handled. Theevent
method may be
called concurrently from several threads. If you can't handle this, you have to use some form of thread locking
in your client! (see the threading
module on Semaphore
), or
Pyro.util.getLockObject
.
To summarize:
pyro-es
command
from the bin
directory (use pyro-es.cmd
on windows). You can specify the following
arguments:pyro-essvc
(Windows-only Event Server 'NT-service' control scripts)essvc.cmd
script to register it as a service. Make sure you have Pyro properly
installed in your Python's site-packages. Or make sure to register the service using an account with the correct
PYTHONPATH setting, so that Pyro can be located. The ES service logs to C:\Pyro_ES_svc.log
where C: is
your system drive.HKLM\System\CurrentControlSet\Services\PyroES
, and the value under that key is:
PyroServiceArguments
(REG_SZ, it will be asked and created for you when doing a essvc.cmd
install
from a command prompt).python -m
to start it:python -m Pyro.EventService.Server
Like the Name Server, if you want to start the Event Server from within your own program,
you can ofcourse start it by executing the start script mentioned above. You could also use the
EventServiceStarter
class from the Pyro.EventService.Server
module to start
it directly (this is what the script also does). Be sure to start it in a separate process or thread
because it will run in its own endless loop. Have a look at the "AllInOne" example to see how
you can start the Event Server using the
EventServiceStarter
class.
You probably have to wait until the ES has been fully started, call the waitUntilStarted()
method on the
starter object. It returns true if the ES has been started, false if it is not yet ready. You can provide a timeout
argument (in seconds).
To start the ES you will first have to start the Name Server because the ES needs that to register itself. After
starting the ES you will then see something like this:
*** Pyro Event Server ***< Pyro Server Initialized. Using Pyro V3.2 URI= PYRO://192.168.1.40:7766/c0a8012804bc0c96774244d7d79d5db3 Event Server started.
There are two config options specifically for the ES:
PYRO_ES_QUEUESIZE
and PYRO_ES_BLOCKQUEUE
. Read about them in the Installation and Configuration chapter. By default, the ES will allocate moderately sized queues
for subscribers, and publishers will block if such a queue becomes full (so no events get lost). You might want to
change this behavior. Every subscriber has its own queue. So if the queue of a slow subscriber fills up, other
subscribers are still serviced nicely. By setting PYRO_ES_BLOCKQUEUE
to 0
, new messages for
full queues are lost. This may be a way to allow slow subscribers to catch up, because new messages are put in the
queue when there is room again. Note that only messages to the slow or frozen subscribers are lost, normal running subscribers
still receive these messages.
Pyro.constants.EVENTSERVER_NAME
. All subjects are case insensitive, so if you publish
something on the "stockquotes" channel it is the same as if you published it on the "STOCKQuotes" channel.To publish an event on a certain topic, you need to have a Pyro proxy object for the ES, and then call the
publish
method:publish(subjects, message)
where subjects
is a subject name or
a sequence of one or more subject names (strings), and message
is the actual message. The message can be
any Python object (as long as it can be pickled):
import Pyro.core import Pyro.constants Pyro.core.initClient() es = Pyro.core.getProxyForURI("PYRONAME://"+Pyro.constants.EVENTSERVER_NAME) es.publish("StockQuotes",( "SUN", 22.44 ) )
If you think this is too much work, or if you want to abstract from the Pyro details, you can use the
Publisher
base class that is provided in Pyro.EventService.Clients.
Subclass your event
publishers from this class. The init takes care of locating the ES, and you can just call the publish(subjects,
message)
method of the base class. No ES proxy code needed:
import Pyro.EventService.Clients class StockPublisher(Pyro.EventService.Clients.Publisher): def __init__(self): Pyro.EventService.Clients.Publisher.__init__(self) def publishQuote(self, symbol, quote): self.publish("StockQuotes", ( symbol, quote) ) sp = StockPublisher() sp.publishQuote("SUN", 22.44)
__init__
of both the Publisher and the Subscriber takes an
optional ident
argument. Use this to specify the authentication passphrase that will be used to connect
to the ES (and also to connect to the Name Server).
__init__
of both the Publisher and the Subscriber takes an
optional esURI
argument. Set it to the URI of the Event Server (string format)
if you don't have a name server running. Look at the 'stockquotes' example to see how this can be done.
Note that the Event service usually prints its URI when started.
Pyro.constants.EVENTSERVER_NAME
. All subjects are case insensitive, so if you publish
something on the "stockquotes" channel it is the same as if you published it on the "STOCKQuotes" channel.Event subscribers are a little more involved that event publishers. This is becaue they are full-blown
Pyro server objects that receive calls from the ES when an event is published on one of the topics
you've subscribed to! Therefore, your clients (subscribers) need to call the Pyro daemon's handleRequests
or
requestLoop
(just like a Pyro server). They also have to call Pyro.core.initServer()
because
they also act as a Pyro server. Furthermore, they usually have to run as a multithreaded server, because
the ES may call it as soon as a new event arrives and you are not done processing the previous event.
Single-threaded servers will build up a backlog of undelivered events if this happens. You still get
all events (with the original timestamp - so you could skip events that "have expired" to catch
up). You can change this behavior by changing the before mentioned config items.
subscribe(subjects, subscriber) |
Subscribe to events. subjects is a subject name or a sequence of one or more subject names
(strings), and subscriber is a proxy for your subscriber object |
subscribeMatch(subjectPatterns, subscriber) |
Subscribe to events based on patterns. subjectPatterns is a subject pattern or a sequence of one or more subject patterns (strings), and
subscriber is a proxy for your subscriber object |
unsubscribe(subjects, subscriber) |
Unsubscribe from subjects. subjects is a subject or subject pattern or a sequence thereof, and subscriber is a proxy for
your subscriber object |
But first, create a subscriber object, which must be a Pyro object (or use delegation). The subscriber object
should have an event(self, event)
method. This method is called by the ES if a new event arrives on a
channel you subscribed to. event
is a Pyro.EventService.Event
object, which has the
following attributes:
msg |
the actual message that was published. Can be any Python object. |
subject |
the subject (string) on which the message was published. (topic name) |
time |
the event's timestamp (from the server - synchronised for all subscribers). A float, taken from
time.time() |
To subscribe, call the subscribe
method of the ES with the desired subject(s) and a proxy for your
subscriber object. If you want to subscribe to multiple subjects based on pattern matching, call the
subscribeMatch
method instead with the desired subject pattern(s) and a proxy for your subscriber
object. The patterns are standard re
-style regex expressions. See the standard re
module
for more information. The pattern '^STOCKQUOTE\\.S.*$'
matches STOCKQUOTE.SUN, STOCKQUOTE.SAP but not
STOCKQUOTE.IBM, NYSE.STOCKQUOTE.SUN etcetera. Once more: the subjects are case insensitive. The patterns are matched
case insensitive too.
To unsubscribe, call the unsubscribe
method with the subject(s) or pattern(s) you want to unsubscribe
from, and a proxy for the subscriber object that has been previously subscribed. This will remove the subscriber from
the subscription list and also from the pattern match list if the subject occurs as a pattern there. The ES
(actually, Pyro) is smart enough to see if multiple (different) proxy objects point to the same subscriber object and
will act correctly.
Subscriber
base class provided in
Pyro.EventService.Clients.
Subclass your event listeners (subscribers) from this class. The init takes
care of locating the ES, and you can just call the
subscribe(subjects)
,subscribeMatch(subjectPatterns)
and unsubscribe(subjects)
methods on the object itself. No ES proxy code needed. This base class also starts a Pyro daemon and by calling
listen()
, your code starts listening on incoming events. When you want to abort the event loop, you have
to call self.abort()
from within the event handler method.
The multithreading of the event
method can be controlled using the
setThreading(threading)
method. If you threading=
0, the threading will be switched off (it
is on by default unless otherwise configured). Your events will then arrive purely sequentially, after processing
each event. Call this method before entering the requestLoop
or handleRequests
or
listen.
A minimalistic event listener that prints the stockquote events published by the example code above:
from Pyro.EventService.Clients import Subscriber
class StockSubscriber(Subscriber): def __init__(self): Subscriber.__init__(self) self.subscribe("StockQuotes") def event(self, event): print "Got a stockquote: %s=%f" % (event.msg) sub = StockSubscriber() sub.listen()
The __init__
of both the Publisher and the Subscriber takes an optional ident
argument.
Use this to specify the authentication passphrase that will be used to connect to the ES (and also to connect to the
Name Server).
setThreading(threading)
method of the Subscriber
base class to control
the threading. If you set threading=0, the threading will be switched off (it is on by default). But a better way to
process events sequentially is to use Python's Queue
module: you create a Queue in your
subscriber process that is filled with arriving events, and you have a single event consumer process that takes
events out of the queue one-by-one:
Pyro Event Server | multithreaded |
↓ | |
Subscriber(s) | multithreaded |
↓ | |
Queue.Queue |
|
↓ | |
Consumer/Worker | singlethreaded |