This document is for Kombu's development version, which can be significantly different from previous releases. Get the stable docs here: 5.0.
Source code for kombu.utils.eventio
"""Selector Utilities."""
import errno
import math
import select as __select__
import sys
from numbers import Integral
from . import fileno
from .compat import detect_environment
__all__ = ('poll',)
_selectf = __select__.select
_selecterr = __select__.error
xpoll = getattr(__select__, 'poll', None)
epoll = getattr(__select__, 'epoll', None)
kqueue = getattr(__select__, 'kqueue', None)
kevent = getattr(__select__, 'kevent', None)
KQ_EV_ADD = getattr(__select__, 'KQ_EV_ADD', 1)
KQ_EV_DELETE = getattr(__select__, 'KQ_EV_DELETE', 2)
KQ_EV_ENABLE = getattr(__select__, 'KQ_EV_ENABLE', 4)
KQ_EV_CLEAR = getattr(__select__, 'KQ_EV_CLEAR', 32)
KQ_EV_ERROR = getattr(__select__, 'KQ_EV_ERROR', 16384)
KQ_EV_EOF = getattr(__select__, 'KQ_EV_EOF', 32768)
KQ_FILTER_READ = getattr(__select__, 'KQ_FILTER_READ', -1)
KQ_FILTER_WRITE = getattr(__select__, 'KQ_FILTER_WRITE', -2)
KQ_FILTER_AIO = getattr(__select__, 'KQ_FILTER_AIO', -3)
KQ_FILTER_VNODE = getattr(__select__, 'KQ_FILTER_VNODE', -4)
KQ_FILTER_PROC = getattr(__select__, 'KQ_FILTER_PROC', -5)
KQ_FILTER_SIGNAL = getattr(__select__, 'KQ_FILTER_SIGNAL', -6)
KQ_FILTER_TIMER = getattr(__select__, 'KQ_FILTER_TIMER', -7)
KQ_NOTE_LOWAT = getattr(__select__, 'KQ_NOTE_LOWAT', 1)
KQ_NOTE_DELETE = getattr(__select__, 'KQ_NOTE_DELETE', 1)
KQ_NOTE_WRITE = getattr(__select__, 'KQ_NOTE_WRITE', 2)
KQ_NOTE_EXTEND = getattr(__select__, 'KQ_NOTE_EXTEND', 4)
KQ_NOTE_ATTRIB = getattr(__select__, 'KQ_NOTE_ATTRIB', 8)
KQ_NOTE_LINK = getattr(__select__, 'KQ_NOTE_LINK', 16)
KQ_NOTE_RENAME = getattr(__select__, 'KQ_NOTE_RENAME', 32)
KQ_NOTE_REVOKE = getattr(__select__, 'KQ_NOTE_REVOKE', 64)
POLLIN = getattr(__select__, 'POLLIN', 1)
POLLOUT = getattr(__select__, 'POLLOUT', 4)
POLLERR = getattr(__select__, 'POLLERR', 8)
POLLHUP = getattr(__select__, 'POLLHUP', 16)
POLLNVAL = getattr(__select__, 'POLLNVAL', 32)
READ = POLL_READ = 0x001
WRITE = POLL_WRITE = 0x004
ERR = POLL_ERR = 0x008 | 0x010
try:
SELECT_BAD_FD = {errno.EBADF, errno.WSAENOTSOCK}
except AttributeError:
SELECT_BAD_FD = {errno.EBADF}
class _epoll:
def __init__(self):
self._epoll = epoll()
def register(self, fd, events):
try:
self._epoll.register(fd, events)
except Exception as exc:
if getattr(exc, 'errno', None) != errno.EEXIST:
raise
return fd
def unregister(self, fd):
try:
self._epoll.unregister(fd)
except (OSError, ValueError, KeyError, TypeError):
pass
except OSError as exc:
if getattr(exc, 'errno', None) not in (errno.ENOENT, errno.EPERM):
raise
def poll(self, timeout):
try:
return self._epoll.poll(timeout if timeout is not None else -1)
except Exception as exc:
if getattr(exc, 'errno', None) != errno.EINTR:
raise
def close(self):
self._epoll.close()
class _kqueue:
w_fflags = (KQ_NOTE_WRITE | KQ_NOTE_EXTEND |
KQ_NOTE_ATTRIB | KQ_NOTE_DELETE)
def __init__(self):
self._kqueue = kqueue()
self._active = {}
self.on_file_change = None
self._kcontrol = self._kqueue.control
def register(self, fd, events):
self._control(fd, events, KQ_EV_ADD)
self._active[fd] = events
return fd
def unregister(self, fd):
events = self._active.pop(fd, None)
if events:
try:
self._control(fd, events, KQ_EV_DELETE)
except OSError:
pass
def watch_file(self, fd):
ev = kevent(fd,
filter=KQ_FILTER_VNODE,
flags=KQ_EV_ADD | KQ_EV_ENABLE | KQ_EV_CLEAR,
fflags=self.w_fflags)
self._kcontrol([ev], 0)
def unwatch_file(self, fd):
ev = kevent(fd,
filter=KQ_FILTER_VNODE,
flags=KQ_EV_DELETE,
fflags=self.w_fflags)
self._kcontrol([ev], 0)
def _control(self, fd, events, flags):
if not events:
return
kevents = []
if events & WRITE:
kevents.append(kevent(fd,
filter=KQ_FILTER_WRITE,
flags=flags))
if not kevents or events & READ:
kevents.append(
kevent(fd, filter=KQ_FILTER_READ, flags=flags),
)
control = self._kcontrol
for e in kevents:
try:
control([e], 0)
except ValueError:
pass
def poll(self, timeout):
try:
kevents = self._kcontrol(None, 1000, timeout)
except Exception as exc:
if getattr(exc, 'errno', None) == errno.EINTR:
return
raise
events, file_changes = {}, []
for k in kevents:
fd = k.ident
if k.filter == KQ_FILTER_READ:
events[fd] = events.get(fd, 0) | READ
elif k.filter == KQ_FILTER_WRITE:
if k.flags & KQ_EV_EOF:
events[fd] = ERR
else:
events[fd] = events.get(fd, 0) | WRITE
elif k.filter == KQ_EV_ERROR:
events[fd] = events.get(fd, 0) | ERR
elif k.filter == KQ_FILTER_VNODE:
if k.fflags & KQ_NOTE_DELETE:
self.unregister(fd)
file_changes.append(k)
if file_changes:
self.on_file_change(file_changes)
return list(events.items())
def close(self):
self._kqueue.close()
class _poll:
def __init__(self):
self._poller = xpoll()
self._quick_poll = self._poller.poll
self._quick_register = self._poller.register
self._quick_unregister = self._poller.unregister
def register(self, fd, events):
fd = fileno(fd)
poll_flags = 0
if events & ERR:
poll_flags |= POLLERR
if events & WRITE:
poll_flags |= POLLOUT
if events & READ:
poll_flags |= POLLIN
self._quick_register(fd, poll_flags)
return fd
def unregister(self, fd):
try:
fd = fileno(fd)
except OSError as exc:
# we don't know the previous fd of this object
# but it will be removed by the next poll iteration.
if getattr(exc, 'errno', None) in SELECT_BAD_FD:
return fd
raise
self._quick_unregister(fd)
return fd
def poll(self, timeout, round=math.ceil,
POLLIN=POLLIN, POLLOUT=POLLOUT, POLLERR=POLLERR,
READ=READ, WRITE=WRITE, ERR=ERR, Integral=Integral):
timeout = 0 if timeout and timeout < 0 else round((timeout or 0) * 1e3)
try:
event_list = self._quick_poll(timeout)
except (_selecterr, OSError) as exc:
if getattr(exc, 'errno', None) == errno.EINTR:
return
raise
ready = []
for fd, event in event_list:
events = 0
if event & POLLIN:
events |= READ
if event & POLLOUT:
events |= WRITE
if event & POLLERR or event & POLLNVAL or event & POLLHUP:
events |= ERR
assert events
if not isinstance(fd, Integral):
fd = fd.fileno()
ready.append((fd, events))
return ready
def close(self):
self._poller = None
class _select:
def __init__(self):
self._all = (self._rfd,
self._wfd,
self._efd) = set(), set(), set()
def register(self, fd, events):
fd = fileno(fd)
if events & ERR:
self._efd.add(fd)
if events & WRITE:
self._wfd.add(fd)
if events & READ:
self._rfd.add(fd)
return fd
def _remove_bad(self):
for fd in self._rfd | self._wfd | self._efd:
try:
_selectf([fd], [], [], 0)
except (_selecterr, OSError) as exc:
if getattr(exc, 'errno', None) in SELECT_BAD_FD:
self.unregister(fd)
def unregister(self, fd):
try:
fd = fileno(fd)
except OSError as exc:
# we don't know the previous fd of this object
# but it will be removed by the next poll iteration.
if getattr(exc, 'errno', None) in SELECT_BAD_FD:
return
raise
self._rfd.discard(fd)
self._wfd.discard(fd)
self._efd.discard(fd)
def poll(self, timeout):
try:
read, write, error = _selectf(
self._rfd, self._wfd, self._efd, timeout,
)
except (_selecterr, OSError) as exc:
if getattr(exc, 'errno', None) == errno.EINTR:
return
elif getattr(exc, 'errno', None) in SELECT_BAD_FD:
return self._remove_bad()
raise
events = {}
for fd in read:
if not isinstance(fd, Integral):
fd = fd.fileno()
events[fd] = events.get(fd, 0) | READ
for fd in write:
if not isinstance(fd, Integral):
fd = fd.fileno()
events[fd] = events.get(fd, 0) | WRITE
for fd in error:
if not isinstance(fd, Integral):
fd = fd.fileno()
events[fd] = events.get(fd, 0) | ERR
return list(events.items())
def close(self):
self._rfd.clear()
self._wfd.clear()
self._efd.clear()
def _get_poller():
if detect_environment() != 'default':
# greenlet
return _select
elif epoll:
# Py2.6+ Linux
return _epoll
elif kqueue and 'netbsd' in sys.platform:
return _kqueue
elif xpoll:
return _poll
else:
return _select
[docs]def poll(*args, **kwargs):
"""Create new poller instance."""
return _get_poller()(*args, **kwargs)