BrokerConnection

class kafka.BrokerConnection(host, port, afi, **configs)[source]

Initialize a Kafka broker connection

Keyword Arguments:
client_id (str): a name for this client. This string is passed in

each request to servers and can be used to identify specific server-side log entries that correspond to this client. Also submitted to GroupCoordinator for logging with respect to consumer group administration. Default: ‘kafka-python-{version}’

reconnect_backoff_ms (int): The amount of time in milliseconds to

wait before attempting to reconnect to a given host. Default: 50.

reconnect_backoff_max_ms (int): The maximum amount of time in

milliseconds to backoff/wait when reconnecting to a broker that has repeatedly failed to connect. If provided, the backoff per host will increase exponentially for each consecutive connection failure, up to this maximum. Once the maximum is reached, reconnection attempts will continue periodically with this fixed rate. To avoid connection storms, a randomization factor of 0.2 will be applied to the backoff resulting in a random range between 20% below and 20% above the computed value. Default: 1000.

request_timeout_ms (int): Client request timeout in milliseconds.

Default: 30000.

max_in_flight_requests_per_connection (int): Requests are pipelined

to kafka brokers up to this number of maximum requests per broker connection. Default: 5.

receive_buffer_bytes (int): The size of the TCP receive buffer

(SO_RCVBUF) to use when reading data. Default: None (relies on system defaults). Java client defaults to 32768.

send_buffer_bytes (int): The size of the TCP send buffer

(SO_SNDBUF) to use when sending data. Default: None (relies on system defaults). Java client defaults to 131072.

socket_options (list): List of tuple-arguments to socket.setsockopt

to apply to broker connection sockets. Default: [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)]

security_protocol (str): Protocol used to communicate with brokers.

Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. Default: PLAINTEXT.

ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping

socket connections. If provided, all other ssl_* configurations will be ignored. Default: None.

ssl_check_hostname (bool): flag to configure whether ssl handshake

should verify that the certificate matches the brokers hostname. default: True.

ssl_cafile (str): optional filename of ca file to use in certificate

verification. default: None.

ssl_certfile (str): optional filename of file in pem format containing

the client certificate, as well as any ca certificates needed to establish the certificate’s authenticity. default: None.

ssl_keyfile (str): optional filename containing the client private key.

default: None.

ssl_password (callable, str, bytes, bytearray): optional password or

callable function that returns a password, for decrypting the client private key. Default: None.

ssl_crlfile (str): optional filename containing the CRL to check for

certificate expiration. By default, no CRL check is done. When providing a file, only the leaf certificate will be checked against this CRL. The CRL can only be checked with Python 3.4+ or 2.7.9+. default: None.

ssl_ciphers (str): optionally set the available ciphers for ssl

connections. It should be a string in the OpenSSL cipher list format. If no cipher can be selected (because compile-time options or other configuration forbids use of all the specified ciphers), an ssl.SSLError will be raised. See ssl.SSLContext.set_ciphers

api_version (tuple): Specify which Kafka API version to use.

Accepted values are: (0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9), (0, 10). Default: (0, 8, 2)

api_version_auto_timeout_ms (int): number of milliseconds to throw a

timeout exception from the constructor when checking the broker api version. Only applies if api_version is None

selector (selectors.BaseSelector): Provide a specific selector

implementation to use for I/O multiplexing. Default: selectors.DefaultSelector

state_change_callback (callable): function to be called when the

connection state changes from CONNECTING to CONNECTED etc.

metrics (kafka.metrics.Metrics): Optionally provide a metrics

instance for capturing network IO stats. Default: None.

metric_group_prefix (str): Prefix for metric names. Default: ‘’ sasl_mechanism (str): Authentication mechanism when security_protocol

is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are: PLAIN, GSSAPI, OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512.

sasl_plain_username (str): username for sasl PLAIN and SCRAM authentication.

Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.

sasl_plain_password (str): password for sasl PLAIN and SCRAM authentication.

Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.

sasl_kerberos_service_name (str): Service name to include in GSSAPI

sasl mechanism handshake. Default: ‘kafka’

sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI

sasl mechanism handshake. Default: one of bootstrap servers

sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider

instance. (See kafka.oauth.abstract). Default: None

blacked_out()[source]

Return true if we are disconnected from the given node and can’t re-establish a connection yet

can_send_more()[source]

Return True unless there are max_in_flight_requests_per_connection.

check_version(timeout=2, strict=False, topics=[])[source]

Attempt to guess the broker version.

Note: This is a blocking call.

Returns: version tuple, i.e. (0, 10), (0, 9), (0, 8, 2), …

close(error=None)[source]

Close socket and fail all in-flight-requests.

Arguments:
error (Exception, optional): pending in-flight-requests

will be failed with this exception. Default: kafka.errors.KafkaConnectionError.

connect()[source]

Attempt to connect and return ConnectionState

connected()[source]

Return True iff socket is connected.

connecting()[source]

Returns True if still connecting (this may encompass several different states, such as SSL handshake, authorization, etc).

connection_delay()[source]

Return the number of milliseconds to wait, based on the connection state, before attempting to send data. When disconnected, this respects the reconnect backoff time. When connecting or connected, returns a very large number to handle slow/stalled connections.

disconnected()[source]

Return True iff socket is closed

recv()[source]

Non-blocking network receive.

Return list of (response, future) tuples

send(request, blocking=True)[source]

Queue request for async network send, return Future()

send_pending_requests()[source]

Attempts to send pending requests messages via blocking IO If all requests have been sent, return True Otherwise, if the socket is blocked and there are more bytes to send, return False.

send_pending_requests_v2()[source]

Attempts to send pending requests messages via non-blocking IO If all requests have been sent, return True Otherwise, if the socket is blocked and there are more bytes to send, return False.