This document is for Kombu's development version, which can be significantly different from previous releases. Get the stable docs here: 5.0.
Source code for kombu.asynchronous.aws.connection
"""Amazon AWS Connection."""
from email import message_from_bytes
from email.mime.message import MIMEMessage
from vine import promise, transform
from kombu.asynchronous.aws.ext import AWSRequest, get_response
from kombu.asynchronous.http import Headers, Request, get_client
def message_from_headers(hdr):
bs = "\r\n".join("{}: {}".format(*h) for h in hdr)
return message_from_bytes(bs.encode())
__all__ = (
'AsyncHTTPSConnection', 'AsyncConnection',
)
class AsyncHTTPResponse:
"""Async HTTP Response."""
def __init__(self, response):
self.response = response
self._msg = None
self.version = 10
def read(self, *args, **kwargs):
return self.response.body
def getheader(self, name, default=None):
return self.response.headers.get(name, default)
def getheaders(self):
return list(self.response.headers.items())
@property
def msg(self):
if self._msg is None:
self._msg = MIMEMessage(message_from_headers(self.getheaders()))
return self._msg
@property
def status(self):
return self.response.code
@property
def reason(self):
if self.response.error:
return self.response.error.message
return ''
def __repr__(self):
return repr(self.response)
[docs]class AsyncHTTPSConnection:
"""Async HTTP Connection."""
Request = Request
Response = AsyncHTTPResponse
method = 'GET'
path = '/'
body = None
default_ports = {'http': 80, 'https': 443}
def __init__(self, strict=None, timeout=20.0, http_client=None):
self.headers = []
self.timeout = timeout
self.strict = strict
self.http_client = http_client or get_client()
[docs] def request(self, method, path, body=None, headers=None):
self.path = path
self.method = method
if body is not None:
try:
read = body.read
except AttributeError:
self.body = body
else:
self.body = read()
if headers is not None:
self.headers.extend(list(headers.items()))
[docs] def getrequest(self):
headers = Headers(self.headers)
return self.Request(self.path, method=self.method, headers=headers,
body=self.body, connect_timeout=self.timeout,
request_timeout=self.timeout, validate_cert=False)
[docs] def getresponse(self, callback=None):
request = self.getrequest()
request.then(transform(self.Response, callback))
return self.http_client.add_request(request)
def __repr__(self):
return f'<AsyncHTTPConnection: {self.getrequest()!r}>'
[docs]class AsyncConnection:
"""Async AWS Connection."""
def __init__(self, sqs_connection, http_client=None, **kwargs):
self.sqs_connection = sqs_connection
self._httpclient = http_client or get_client()
def _mexe(self, request, sender=None, callback=None):
callback = callback or promise()
conn = self.get_http_connection()
if callable(sender):
sender(conn, request.method, request.path, request.body,
request.headers, callback)
else:
conn.request(request.method, request.url,
request.body, request.headers)
conn.getresponse(callback=callback)
return callback
class AsyncAWSQueryConnection(AsyncConnection):
"""Async AWS Query Connection."""
STATUS_CODE_OK = 200
STATUS_CODE_REQUEST_TIMEOUT = 408
STATUS_CODE_NETWORK_CONNECT_TIMEOUT_ERROR = 599
STATUS_CODE_INTERNAL_ERROR = 500
STATUS_CODE_BAD_GATEWAY = 502
STATUS_CODE_SERVICE_UNAVAILABLE_ERROR = 503
STATUS_CODE_GATEWAY_TIMEOUT = 504
STATUS_CODES_SERVER_ERRORS = (
STATUS_CODE_INTERNAL_ERROR,
STATUS_CODE_BAD_GATEWAY,
STATUS_CODE_SERVICE_UNAVAILABLE_ERROR
)
STATUS_CODES_TIMEOUT = (
STATUS_CODE_REQUEST_TIMEOUT,
STATUS_CODE_NETWORK_CONNECT_TIMEOUT_ERROR,
STATUS_CODE_GATEWAY_TIMEOUT
)
def __init__(self, sqs_connection, http_client=None,
http_client_params=None, **kwargs):
if not http_client_params:
http_client_params = {}
super().__init__(sqs_connection, http_client,
**http_client_params)
def make_request(self, operation, params_, path, verb, callback=None): # noqa
params = params_.copy()
if operation:
params['Action'] = operation
signer = self.sqs_connection._request_signer
# defaults for non-get
signing_type = 'standard'
param_payload = {'data': params}
if verb.lower() == 'get':
# query-based opts
signing_type = 'presignurl'
param_payload = {'params': params}
request = AWSRequest(method=verb, url=path, **param_payload)
signer.sign(operation, request, signing_type=signing_type)
prepared_request = request.prepare()
return self._mexe(prepared_request, callback=callback)
def get_list(self, operation, params, markers, path='/', parent=None, verb='POST', callback=None): # noqa
return self.make_request(
operation, params, path, verb,
callback=transform(
self._on_list_ready, callback, parent or self, markers,
operation
),
)
def get_object(self, operation, params, path='/', parent=None, verb='GET', callback=None): # noqa
return self.make_request(
operation, params, path, verb,
callback=transform(
self._on_obj_ready, callback, parent or self, operation
),
)
def get_status(self, operation, params, path='/', parent=None, verb='GET', callback=None): # noqa
return self.make_request(
operation, params, path, verb,
callback=transform(
self._on_status_ready, callback, parent or self, operation
),
)
def _on_list_ready(self, parent, markers, operation, response):
service_model = self.sqs_connection.meta.service_model
if response.status == self.STATUS_CODE_OK:
_, parsed = get_response(
service_model.operation_model(operation), response.response
)
return parsed
elif (
response.status in self.STATUS_CODES_TIMEOUT or
response.status in self.STATUS_CODES_SERVER_ERRORS
):
# When the server returns a timeout or 50X server error,
# the response is interpreted as an empty list.
# This prevents hanging the Celery worker.
return []
else:
raise self._for_status(response, response.read())
def _on_obj_ready(self, parent, operation, response):
service_model = self.sqs_connection.meta.service_model
if response.status == self.STATUS_CODE_OK:
_, parsed = get_response(
service_model.operation_model(operation), response.response
)
return parsed
else:
raise self._for_status(response, response.read())
def _on_status_ready(self, parent, operation, response):
service_model = self.sqs_connection.meta.service_model
if response.status == self.STATUS_CODE_OK:
httpres, _ = get_response(
service_model.operation_model(operation), response.response
)
return httpres.code
else:
raise self._for_status(response, response.read())
def _for_status(self, response, body):
context = 'Empty body' if not body else 'HTTP Error'
return Exception("Request {} HTTP {} {} ({})".format(
context, response.status, response.reason, body
))