This document describes the current stable version of Celery (5.2). For development docs, go here.
Source code for billiard.compat
import errno
import numbers
import os
import subprocess
import sys
from itertools import zip_longest
if sys.platform == 'win32':
try:
import _winapi # noqa
except ImportError: # pragma: no cover
from _multiprocessing import win32 as _winapi # noqa
else:
_winapi = None # noqa
try:
import resource
except ImportError: # pragma: no cover
resource = None
from io import UnsupportedOperation
FILENO_ERRORS = (AttributeError, ValueError, UnsupportedOperation)
if hasattr(os, 'write'):
__write__ = os.write
def send_offset(fd, buf, offset):
return __write__(fd, buf[offset:])
else: # non-posix platform
def send_offset(fd, buf, offset): # noqa
raise NotImplementedError('send_offset')
try:
fsencode = os.fsencode
fsdecode = os.fsdecode
except AttributeError:
def _fscodec():
encoding = sys.getfilesystemencoding()
if encoding == 'mbcs':
errors = 'strict'
else:
errors = 'surrogateescape'
def fsencode(filename):
"""
Encode filename to the filesystem encoding with 'surrogateescape'
error handler, return bytes unchanged. On Windows, use 'strict'
error handler if the file system encoding is 'mbcs' (which is the
default encoding).
"""
if isinstance(filename, bytes):
return filename
elif isinstance(filename, str):
return filename.encode(encoding, errors)
else:
raise TypeError("expect bytes or str, not %s"
% type(filename).__name__)
def fsdecode(filename):
"""
Decode filename from the filesystem encoding with 'surrogateescape'
error handler, return str unchanged. On Windows, use 'strict' error
handler if the file system encoding is 'mbcs' (which is the default
encoding).
"""
if isinstance(filename, str):
return filename
elif isinstance(filename, bytes):
return filename.decode(encoding, errors)
else:
raise TypeError("expect bytes or str, not %s"
% type(filename).__name__)
return fsencode, fsdecode
fsencode, fsdecode = _fscodec()
del _fscodec
def maybe_fileno(f):
"""Get object fileno, or :const:`None` if not defined."""
if isinstance(f, numbers.Integral):
return f
try:
return f.fileno()
except FILENO_ERRORS:
pass
[docs]def get_fdmax(default=None):
"""Return the maximum number of open file descriptors
on this system.
:keyword default: Value returned if there's no file
descriptor limit.
"""
try:
return os.sysconf('SC_OPEN_MAX')
except:
pass
if resource is None: # Windows
return default
fdmax = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
if fdmax == resource.RLIM_INFINITY:
return default
return fdmax
def uniq(it):
"""Return all unique elements in ``it``, preserving order."""
seen = set()
return (seen.add(obj) or obj for obj in it if obj not in seen)
try:
closerange = os.closerange
except AttributeError:
def closerange(fd_low, fd_high): # noqa
for fd in reversed(range(fd_low, fd_high)):
try:
os.close(fd)
except OSError as exc:
if exc.errno != errno.EBADF:
raise
def close_open_fds(keep=None):
# must make sure this is 0-inclusive (Issue #celery/1882)
keep = list(uniq(sorted(
f for f in map(maybe_fileno, keep or []) if f is not None
)))
maxfd = get_fdmax(default=2048)
kL, kH = iter([-1] + keep), iter(keep + [maxfd])
for low, high in zip_longest(kL, kH):
if low + 1 != high:
closerange(low + 1, high)
else:
[docs] def close_open_fds(keep=None): # noqa
keep = [maybe_fileno(f)
for f in (keep or []) if maybe_fileno(f) is not None]
for fd in reversed(range(get_fdmax(default=2048))):
if fd not in keep:
try:
os.close(fd)
except OSError as exc:
if exc.errno != errno.EBADF:
raise
def get_errno(exc):
""":exc:`socket.error` and :exc:`IOError` first got
the ``.errno`` attribute in Py2.7"""
try:
return exc.errno
except AttributeError:
return 0
try:
import _posixsubprocess
except ImportError:
def spawnv_passfds(path, args, passfds):
if sys.platform != 'win32':
# when not using _posixsubprocess (on earlier python) and not on
# windows, we want to keep stdout/stderr open...
passfds = passfds + [
maybe_fileno(sys.stdout),
maybe_fileno(sys.stderr),
]
pid = os.fork()
if not pid:
close_open_fds(keep=sorted(f for f in passfds if f))
os.execv(fsencode(path), args)
return pid
else:
def spawnv_passfds(path, args, passfds):
passfds = sorted(passfds)
errpipe_read, errpipe_write = os.pipe()
try:
args = [
args, [fsencode(path)], True, tuple(passfds), None, None,
-1, -1, -1, -1, -1, -1, errpipe_read, errpipe_write,
False, False]
if sys.version_info >= (3, 11):
args.append(-1) # process_group
if sys.version_info >= (3, 9):
args.extend((None, None, None, -1)) # group, extra_groups, user, umask
args.append(None) # preexec_fn
if sys.version_info >= (3, 11):
args.append(subprocess._USE_VFORK)
return _posixsubprocess.fork_exec(*args)
finally:
os.close(errpipe_read)
os.close(errpipe_write)
if sys.platform == 'win32':
def setblocking(handle, blocking):
raise NotImplementedError('setblocking not implemented on win32')
def isblocking(handle):
raise NotImplementedError('isblocking not implemented on win32')
else:
from os import O_NONBLOCK
from fcntl import fcntl, F_GETFL, F_SETFL
def isblocking(handle): # noqa
return not (fcntl(handle, F_GETFL) & O_NONBLOCK)
def setblocking(handle, blocking): # noqa
flags = fcntl(handle, F_GETFL, 0)
fcntl(
handle, F_SETFL,
flags & (~O_NONBLOCK) if blocking else flags | O_NONBLOCK,
)
E_PSUTIL_MISSING = """
On Windows, the ability to inspect memory usage requires the psutil library.
You can install it using pip:
$ pip install psutil
"""
E_RESOURCE_MISSING = """
Your platform ({0}) does not seem to have the `resource.getrusage' function.
Please open an issue so that we can add support for this platform.
"""
if sys.platform == 'win32':
try:
import psutil
except ImportError: # pragma: no cover
psutil = None # noqa
def mem_rss():
# type () -> int
if psutil is None:
raise ImportError(E_PSUTIL_MISSING.strip())
return int(psutil.Process(os.getpid()).memory_info()[0] / 1024.0)
else:
try:
from resource import getrusage, RUSAGE_SELF
except ImportError: # pragma: no cover
getrusage = RUSAGE_SELF = None # noqa
if 'bsd' in sys.platform or sys.platform == 'darwin':
# On BSD platforms :man:`getrusage(2)` ru_maxrss field is in bytes.
def maxrss_to_kb(v):
# type: (SupportsInt) -> int
return int(v) / 1024.0
else:
# On Linux it's kilobytes.
def maxrss_to_kb(v):
# type: (SupportsInt) -> int
return int(v)
def mem_rss():
# type () -> int
if resource is None:
raise ImportError(E_RESOURCE_MISSING.strip().format(sys.platform))
return maxrss_to_kb(getrusage(RUSAGE_SELF).ru_maxrss)