BrokerConnection¶
- class kafka.BrokerConnection(host, port, afi, **configs)[source]¶
Initialize a Kafka broker connection
- Keyword Arguments:
- client_id (str): a name for this client. This string is passed in
each request to servers and can be used to identify specific server-side log entries that correspond to this client. Also submitted to GroupCoordinator for logging with respect to consumer group administration. Default: ‘kafka-python-{version}’
- reconnect_backoff_ms (int): The amount of time in milliseconds to
wait before attempting to reconnect to a given host. Default: 50.
- reconnect_backoff_max_ms (int): The maximum amount of time in
milliseconds to backoff/wait when reconnecting to a broker that has repeatedly failed to connect. If provided, the backoff per host will increase exponentially for each consecutive connection failure, up to this maximum. Once the maximum is reached, reconnection attempts will continue periodically with this fixed rate. To avoid connection storms, a randomization factor of 0.2 will be applied to the backoff resulting in a random range between 20% below and 20% above the computed value. Default: 1000.
- request_timeout_ms (int): Client request timeout in milliseconds.
Default: 30000.
- max_in_flight_requests_per_connection (int): Requests are pipelined
to kafka brokers up to this number of maximum requests per broker connection. Default: 5.
- receive_buffer_bytes (int): The size of the TCP receive buffer
(SO_RCVBUF) to use when reading data. Default: None (relies on system defaults). Java client defaults to 32768.
- send_buffer_bytes (int): The size of the TCP send buffer
(SO_SNDBUF) to use when sending data. Default: None (relies on system defaults). Java client defaults to 131072.
- socket_options (list): List of tuple-arguments to socket.setsockopt
to apply to broker connection sockets. Default: [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)]
- security_protocol (str): Protocol used to communicate with brokers.
Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. Default: PLAINTEXT.
- ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping
socket connections. If provided, all other ssl_* configurations will be ignored. Default: None.
- ssl_check_hostname (bool): flag to configure whether ssl handshake
should verify that the certificate matches the brokers hostname. default: True.
- ssl_cafile (str): optional filename of ca file to use in certificate
verification. default: None.
- ssl_certfile (str): optional filename of file in pem format containing
the client certificate, as well as any ca certificates needed to establish the certificate’s authenticity. default: None.
- ssl_keyfile (str): optional filename containing the client private key.
default: None.
- ssl_password (callable, str, bytes, bytearray): optional password or
callable function that returns a password, for decrypting the client private key. Default: None.
- ssl_crlfile (str): optional filename containing the CRL to check for
certificate expiration. By default, no CRL check is done. When providing a file, only the leaf certificate will be checked against this CRL. The CRL can only be checked with Python 3.4+ or 2.7.9+. default: None.
- ssl_ciphers (str): optionally set the available ciphers for ssl
connections. It should be a string in the OpenSSL cipher list format. If no cipher can be selected (because compile-time options or other configuration forbids use of all the specified ciphers), an ssl.SSLError will be raised. See ssl.SSLContext.set_ciphers
- api_version (tuple): Specify which Kafka API version to use.
Accepted values are: (0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9), (0, 10). Default: (0, 8, 2)
- api_version_auto_timeout_ms (int): number of milliseconds to throw a
timeout exception from the constructor when checking the broker api version. Only applies if api_version is None
- selector (selectors.BaseSelector): Provide a specific selector
implementation to use for I/O multiplexing. Default: selectors.DefaultSelector
- state_change_callback (callable): function to be called when the
connection state changes from CONNECTING to CONNECTED etc.
- metrics (kafka.metrics.Metrics): Optionally provide a metrics
instance for capturing network IO stats. Default: None.
metric_group_prefix (str): Prefix for metric names. Default: ‘’ sasl_mechanism (str): Authentication mechanism when security_protocol
is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are: PLAIN, GSSAPI, OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512.
- sasl_plain_username (str): username for sasl PLAIN and SCRAM authentication.
Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
- sasl_plain_password (str): password for sasl PLAIN and SCRAM authentication.
Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
- sasl_kerberos_service_name (str): Service name to include in GSSAPI
sasl mechanism handshake. Default: ‘kafka’
- sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI
sasl mechanism handshake. Default: one of bootstrap servers
- sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider
instance. (See kafka.oauth.abstract). Default: None
- blacked_out()[source]¶
Return true if we are disconnected from the given node and can’t re-establish a connection yet
- check_version(timeout=2, strict=False, topics=[])[source]¶
Attempt to guess the broker version.
Note: This is a blocking call.
Returns: version tuple, i.e. (0, 10), (0, 9), (0, 8, 2), …
- close(error=None)[source]¶
Close socket and fail all in-flight-requests.
- Arguments:
- error (Exception, optional): pending in-flight-requests
will be failed with this exception. Default: kafka.errors.KafkaConnectionError.
- connecting()[source]¶
Returns True if still connecting (this may encompass several different states, such as SSL handshake, authorization, etc).
- connection_delay()[source]¶
Return the number of milliseconds to wait, based on the connection state, before attempting to send data. When disconnected, this respects the reconnect backoff time. When connecting or connected, returns a very large number to handle slow/stalled connections.