# -*- coding: utf-8 -*- # pylint: disable=C0111,C0103,R0205 import functools import logging import threading import time import pika from pika.exchange_type import ExchangeType LOG_FORMAT = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) ' '-35s %(lineno) -5d: %(message)s') LOGGER = logging.getLogger(__name__) logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT) def ack_message(ch, delivery_tag): """Note that `ch` must be the same pika channel instance via which the message being ACKed was retrieved (AMQP protocol constraint). """ if ch.is_open: ch.basic_ack(delivery_tag) else: # Channel is already closed, so we can't ACK this message; # log and/or do something that makes sense for your app in this case. pass def do_work(conn, ch, delivery_tag, body): thread_id = threading.get_ident() LOGGER.info('Thread id: %s Delivery tag: %s Message body: %s', thread_id, delivery_tag, body) # Sleeping to simulate 10 seconds of work time.sleep(10) cb = functools.partial(ack_message, ch, delivery_tag) conn.add_callback_threadsafe(cb) def on_message(ch, method_frame, _header_frame, body, args): (conn, thrds) = args delivery_tag = method_frame.delivery_tag t = threading.Thread(target=do_work, args=(conn, ch, delivery_tag, body)) t.start() thrds.append(t) credentials = pika.PlainCredentials('guest', 'guest') # Note: sending a short heartbeat to prove that heartbeats are still # sent even though the worker simulates long-running work parameters = pika.ConnectionParameters( 'localhost', credentials=credentials, heartbeat=5) connection = pika.BlockingConnection(parameters) channel = connection.channel() channel.exchange_declare( exchange="test_exchange", exchange_type=ExchangeType.direct, passive=False, durable=True, auto_delete=False) channel.queue_declare(queue="standard", auto_delete=True) channel.queue_bind( queue="standard", exchange="test_exchange", routing_key="standard_key") # Note: prefetch is set to 1 here as an example only and to keep the number of threads created # to a reasonable amount. In production you will want to test with different prefetch values # to find which one provides the best performance and usability for your solution channel.basic_qos(prefetch_count=1) threads = [] on_message_callback = functools.partial(on_message, args=(connection, threads)) channel.basic_consume('standard', on_message_callback) try: channel.start_consuming() except KeyboardInterrupt: channel.stop_consuming() # Wait for all to complete for thread in threads: thread.join() connection.close()