This document describes the current stable version of Celery (5.2). For development docs, go here.
Source code for celery.worker.autoscale
"""Pool Autoscaling.
This module implements the internal thread responsible
for growing and shrinking the pool according to the
current autoscale settings.
The autoscale thread is only enabled if
the :option:`celery worker --autoscale` option is used.
"""
import os
import threading
from time import monotonic, sleep
from kombu.asynchronous.semaphore import DummyLock
from celery import bootsteps
from celery.utils.log import get_logger
from celery.utils.threads import bgThread
from . import state
from .components import Pool
__all__ = ('Autoscaler', 'WorkerComponent')
logger = get_logger(__name__)
debug, info, error = logger.debug, logger.info, logger.error
AUTOSCALE_KEEPALIVE = float(os.environ.get('AUTOSCALE_KEEPALIVE', 30))
[docs]class WorkerComponent(bootsteps.StartStopStep):
"""Bootstep that starts the autoscaler thread/timer in the worker."""
label = 'Autoscaler'
conditional = True
requires = (Pool,)
def __init__(self, w, **kwargs):
self.enabled = w.autoscale
w.autoscaler = None
[docs] def create(self, w):
scaler = w.autoscaler = self.instantiate(
w.autoscaler_cls,
w.pool, w.max_concurrency, w.min_concurrency,
worker=w, mutex=DummyLock() if w.use_eventloop else None,
)
return scaler if not w.use_eventloop else None
[docs] def register_with_event_loop(self, w, hub):
w.consumer.on_task_message.add(w.autoscaler.maybe_scale)
hub.call_repeatedly(
w.autoscaler.keepalive, w.autoscaler.maybe_scale,
)
[docs] def info(self, w):
"""Return `Autoscaler` info."""
return {'autoscaler': w.autoscaler.info()}
[docs]class Autoscaler(bgThread):
"""Background thread to autoscale pool workers."""
def __init__(self, pool, max_concurrency,
min_concurrency=0, worker=None,
keepalive=AUTOSCALE_KEEPALIVE, mutex=None):
super().__init__()
self.pool = pool
self.mutex = mutex or threading.Lock()
self.max_concurrency = max_concurrency
self.min_concurrency = min_concurrency
self.keepalive = keepalive
self._last_scale_up = None
self.worker = worker
assert self.keepalive, 'cannot scale down too fast.'
def _maybe_scale(self, req=None):
procs = self.processes
cur = min(self.qty, self.max_concurrency)
if cur > procs:
self.scale_up(cur - procs)
return True
cur = max(self.qty, self.min_concurrency)
if cur < procs:
self.scale_down(procs - cur)
return True
[docs] def update(self, max=None, min=None):
with self.mutex:
if max is not None:
if max < self.processes:
self._shrink(self.processes - max)
self._update_consumer_prefetch_count(max)
self.max_concurrency = max
if min is not None:
if min > self.processes:
self._grow(min - self.processes)
self.min_concurrency = min
return self.max_concurrency, self.min_concurrency
[docs] def scale_down(self, n):
if self._last_scale_up and (
monotonic() - self._last_scale_up > self.keepalive):
return self._shrink(n)
def _grow(self, n):
info('Scaling up %s processes.', n)
self.pool.grow(n)
def _shrink(self, n):
info('Scaling down %s processes.', n)
try:
self.pool.shrink(n)
except ValueError:
debug("Autoscaler won't scale down: all processes busy.")
except Exception as exc:
error('Autoscaler: scale_down: %r', exc, exc_info=True)
def _update_consumer_prefetch_count(self, new_max):
diff = new_max - self.max_concurrency
if diff:
self.worker.consumer._update_prefetch_count(
diff
)
[docs] def info(self):
return {
'max': self.max_concurrency,
'min': self.min_concurrency,
'current': self.processes,
'qty': self.qty,
}
@property
def qty(self):
return len(state.reserved_requests)
@property
def processes(self):
return self.pool.num_processes