This document is for Kombu's development version, which can be significantly different from previous releases. Get the stable docs here: 5.0.
Azure Service Bus Transport - kombu.transport.azureservicebus
¶
Azure Service Bus Message Queue transport module for kombu.
Note that the Shared Access Policy used to connect to Azure Service Bus requires Manage, Send and Listen claims since the broker will create new queues and delete old queues as required.
Notes when using with Celery if you are experiencing issues with programs not
terminating properly. The Azure Service Bus SDK uses the Azure uAMQP library
which in turn creates some threads. If the AzureServiceBus Channel is closed,
said threads will be closed properly, but it seems there are times when Celery
does not do this so these threads will be left running. As the uAMQP threads
are not marked as Daemon threads, they will not be killed when the main thread
exits. Setting the uamqp_keep_alive_interval
transport option to 0 will
prevent the keep_alive thread from starting
More information about Azure Service Bus: https://azure.microsoft.com/en-us/services/service-bus/
Features¶
Type: Virtual
Supports Direct: Unreviewed
Supports Topic: Unreviewed
Supports Fanout: Unreviewed
Supports Priority: Unreviewed
Supports TTL: Unreviewed
Connection String¶
Connection string has the following format:
azureservicebus://SAS_POLICY_NAME:SAS_KEY@SERVICE_BUSNAMESPACE
Transport Options¶
queue_name_prefix
- String prefix to prepend to queue names in a service bus namespace.wait_time_seconds
- Number of seconds to wait to receive messages. Default5
peek_lock_seconds
- Number of seconds the message is visible for before it is requeued and sent to another consumer. Default60
uamqp_keep_alive_interval
- Interval in seconds the Azure uAMQP library should send keepalive messages. Default30
retry_total
- Azure SDK retry total. Default3
retry_backoff_factor
- Azure SDK exponential backoff factor. Default0.8
retry_backoff_max
- Azure SDK retry total time. Default120
Transport¶
- class kombu.transport.azureservicebus.Transport(client, **kwargs)[source]¶
Azure Service Bus transport.
- class Channel(*args, **kwargs)¶
Azure Service Bus channel.
- basic_ack(delivery_tag: str, multiple: bool = False) None ¶
Acknowledge message.
- basic_cancel(consumer_tag)¶
Cancel consumer by consumer tag.
- basic_consume(queue, no_ack, *args, **kwargs)¶
Consume from queue.
- close() None ¶
Close channel.
Cancel all consumers, and requeue unacked messages.
- property conninfo¶
- default_peek_lock_seconds = 60¶
- default_retry_backoff_factor = 0.8¶
- default_retry_backoff_max = 120¶
- default_retry_total = 3¶
- default_uamqp_keep_alive_interval = 30¶
- default_wait_time_seconds = 5¶
- domain_format = 'kombu%(vhost)s'¶
- entity_name(name: str, table: Optional[Dict[int, int]] = None) str ¶
Format AMQP queue name into a valid ServiceBus queue name.
- property peek_lock_seconds: int¶
- property queue_mgmt_service: azure.servicebus.management._management_client.ServiceBusAdministrationClient¶
- property queue_name_prefix: str¶
- property queue_service: azure.servicebus._servicebus_client.ServiceBusClient¶
- property retry_backoff_factor: float¶
- property retry_backoff_max: int¶
- property retry_total: int¶
- property transport_options¶
- property uamqp_keep_alive_interval: int¶
- property wait_time_seconds: int¶
- classmethod as_uri(uri: str, include_password=False, mask='**') str [source]¶
Customise the display format of the URI.
- can_parse_url = True¶
Set to True if
Connection
should pass the URL unmodified.
- default_port = None¶
port number used when no port is specified.
- polling_interval = 1¶
Time to sleep between unsuccessful polls.
Channel¶
- class kombu.transport.azureservicebus.Channel(*args, **kwargs)[source]¶
Azure Service Bus channel.
- property conninfo¶
- default_peek_lock_seconds = 60¶
- default_retry_backoff_factor = 0.8¶
- default_retry_backoff_max = 120¶
- default_retry_total = 3¶
- default_uamqp_keep_alive_interval = 30¶
- default_wait_time_seconds = 5¶
- domain_format = 'kombu%(vhost)s'¶
- entity_name(name: str, table: Optional[Dict[int, int]] = None) str [source]¶
Format AMQP queue name into a valid ServiceBus queue name.
- property peek_lock_seconds: int¶
- property queue_mgmt_service: azure.servicebus.management._management_client.ServiceBusAdministrationClient¶
- property queue_name_prefix: str¶
- property queue_service: azure.servicebus._servicebus_client.ServiceBusClient¶
- property retry_backoff_factor: float¶
- property retry_backoff_max: int¶
- property retry_total: int¶
- property transport_options¶
- property uamqp_keep_alive_interval: int¶
- property wait_time_seconds: int¶