This document is for Kombu's development version, which can be significantly different from previous releases. Get the stable docs here: 5.0.
Source code for kombu.asynchronous.aws.sqs.connection
"""Amazon SQS Connection."""
from vine import transform
from kombu.asynchronous.aws.connection import AsyncAWSQueryConnection
from .ext import boto3
from .message import AsyncMessage
from .queue import AsyncQueue
__all__ = ('AsyncSQSConnection',)
[docs]class AsyncSQSConnection(AsyncAWSQueryConnection):
"""Async SQS Connection."""
def __init__(self, sqs_connection, debug=0, region=None, **kwargs):
if boto3 is None:
raise ImportError('boto3 is not installed')
super().__init__(
sqs_connection,
region_name=region, debug=debug,
**kwargs
)
[docs] def create_queue(self, queue_name,
visibility_timeout=None, callback=None):
params = {'QueueName': queue_name}
if visibility_timeout:
params['DefaultVisibilityTimeout'] = format(
visibility_timeout, 'd',
)
return self.get_object('CreateQueue', params,
callback=callback)
[docs] def delete_queue(self, queue, force_deletion=False, callback=None):
return self.get_status('DeleteQueue', None, queue.id,
callback=callback)
[docs] def get_queue_url(self, queue):
res = self.sqs_connection.get_queue_url(QueueName=queue)
return res['QueueUrl']
[docs] def get_queue_attributes(self, queue, attribute='All', callback=None):
return self.get_object(
'GetQueueAttributes', {'AttributeName': attribute},
queue.id, callback=callback,
)
[docs] def set_queue_attribute(self, queue, attribute, value, callback=None):
return self.get_status(
'SetQueueAttribute',
{'Attribute.Name': attribute, 'Attribute.Value': value},
queue.id, callback=callback,
)
[docs] def receive_message(
self, queue, queue_url, number_messages=1, visibility_timeout=None,
attributes=('ApproximateReceiveCount',), wait_time_seconds=None,
callback=None
):
params = {'MaxNumberOfMessages': number_messages}
if visibility_timeout:
params['VisibilityTimeout'] = visibility_timeout
if attributes:
attrs = {}
for idx, attr in enumerate(attributes):
attrs['AttributeName.' + str(idx + 1)] = attr
params.update(attrs)
if wait_time_seconds is not None:
params['WaitTimeSeconds'] = wait_time_seconds
return self.get_list(
'ReceiveMessage', params, [('Message', AsyncMessage)],
queue_url, callback=callback, parent=queue,
)
[docs] def delete_message(self, queue, receipt_handle, callback=None):
return self.delete_message_from_handle(
queue, receipt_handle, callback,
)
[docs] def delete_message_batch(self, queue, messages, callback=None):
params = {}
for i, m in enumerate(messages):
prefix = f'DeleteMessageBatchRequestEntry.{i + 1}'
params.update({
f'{prefix}.Id': m.id,
f'{prefix}.ReceiptHandle': m.receipt_handle,
})
return self.get_object(
'DeleteMessageBatch', params, queue.id,
verb='POST', callback=callback,
)
[docs] def delete_message_from_handle(self, queue, receipt_handle,
callback=None):
return self.get_status(
'DeleteMessage', {'ReceiptHandle': receipt_handle},
queue, callback=callback,
)
[docs] def send_message(self, queue, message_content,
delay_seconds=None, callback=None):
params = {'MessageBody': message_content}
if delay_seconds:
params['DelaySeconds'] = int(delay_seconds)
return self.get_object(
'SendMessage', params, queue.id,
verb='POST', callback=callback,
)
[docs] def send_message_batch(self, queue, messages, callback=None):
params = {}
for i, msg in enumerate(messages):
prefix = f'SendMessageBatchRequestEntry.{i + 1}'
params.update({
f'{prefix}.Id': msg[0],
f'{prefix}.MessageBody': msg[1],
f'{prefix}.DelaySeconds': msg[2],
})
return self.get_object(
'SendMessageBatch', params, queue.id,
verb='POST', callback=callback,
)
[docs] def change_message_visibility(self, queue, receipt_handle,
visibility_timeout, callback=None):
return self.get_status(
'ChangeMessageVisibility',
{'ReceiptHandle': receipt_handle,
'VisibilityTimeout': visibility_timeout},
queue.id, callback=callback,
)
[docs] def change_message_visibility_batch(self, queue, messages, callback=None):
params = {}
for i, t in enumerate(messages):
pre = f'ChangeMessageVisibilityBatchRequestEntry.{i + 1}'
params.update({
f'{pre}.Id': t[0].id,
f'{pre}.ReceiptHandle': t[0].receipt_handle,
f'{pre}.VisibilityTimeout': t[1],
})
return self.get_object(
'ChangeMessageVisibilityBatch', params, queue.id,
verb='POST', callback=callback,
)
[docs] def get_all_queues(self, prefix='', callback=None):
params = {}
if prefix:
params['QueueNamePrefix'] = prefix
return self.get_list(
'ListQueues', params, [('QueueUrl', AsyncQueue)],
callback=callback,
)
[docs] def get_queue(self, queue_name, callback=None):
# TODO Does not support owner_acct_id argument
return self.get_all_queues(
queue_name,
transform(self._on_queue_ready, callback, queue_name),
)
lookup = get_queue
def _on_queue_ready(self, name, queues):
return next(
(q for q in queues if q.url.endswith(name)), None,
)
[docs] def get_dead_letter_source_queues(self, queue, callback=None):
return self.get_list(
'ListDeadLetterSourceQueues', {'QueueUrl': queue.url},
[('QueueUrl', AsyncQueue)],
callback=callback,
)
[docs] def add_permission(self, queue, label, aws_account_id, action_name,
callback=None):
return self.get_status(
'AddPermission',
{'Label': label,
'AWSAccountId': aws_account_id,
'ActionName': action_name},
queue.id, callback=callback,
)
[docs] def remove_permission(self, queue, label, callback=None):
return self.get_status(
'RemovePermission', {'Label': label}, queue.id, callback=callback,
)