Created on 03-25-2019 12:54 PM - edited 08-17-2019 02:34 PM
This article covers how to run a simple producer and consumer in kafka-python (1.4.5) on Kafka 2.0.0, within a kerberized HDP 3.1.0 cluster.
We will first need to install the gssapi libraries for python in order to be able to use kafka-python with kerberos. We will also need to install kafka-python if you have not already done so. I will be using the latest kafka-python version currently available, which is version 1.4.5 at the time of writing this article.
pip install gssapi pip install kafka-python
Make sure that Kerberos is correctly enabled for kafka on your cluster. We will want to verify that the kafka broker properties are set to listen on SASL_PLAINTEXT through ambari -> kafka -> configs:
Also verify that GSSAPI is listed in the sasl.enabled.mechanisms property:
Next, we obtain a Kerberos token before running our kafka-python producer.
[kafka@c2175-node4 ~]$ kinit -kt /etc/security/keytabs/kafka.service.keytab kafka/c2175-node4.hwx.com@HWX.COM [kafka@c2175-node4 ~]$ klist Ticket cache: FILE:/tmp/krb5cc_1006 Default principal: kafka/c2175-node4.hwx.com@HWX.COM Valid starting Expires Service principal 03/25/2019 11:59:26 03/26/2019 11:59:26 krbtgt/HWX.COM@HWX.COM
We will be running the following code from python:
from kafka import KafkaProducer import logging logging.basicConfig(level=logging.DEBUG) producer = KafkaProducer(bootstrap_servers='c2175-node4.hwx.com:6667', api_version='0.10', security_protocol='SASL_PLAINTEXT', sasl_mechanism='GSSAPI', sasl_kerberos_service_name='kafka', connections_max_idle_ms=1200000, request_timeout_ms=1000000, api_version_auto_timeout_ms=1000000) for item in range(10): producer.send('new_topic', b'Happy '+ str(item)+' Days!')
Let me explain some of the parameters in the above python block.
Also note that in the code example given I’ve enabled debug logging. This is of course not required in order to run the producer but it will help you in case you run into any errors.
Here is what running the kafka-python on a kerberized HDP 3.1 cluster looks like;
[kafka@c2175-node4 ~]$ klist Ticket cache: FILE:/tmp/krb5cc_1006 Default principal: kafka/c2175-node4.hwx.com@HWX.COM Valid starting Expires Service principal 03/25/2019 12:24:18 03/26/2019 12:24:18 krbtgt/HWX.COM@HWX.COM [kafka@c2175-node4 ~]$ python Python 2.7.5 (default, Apr 11 2018, 07:36:10) [GCC 4.8.5 20150623 (Red Hat 4.8.5-28)] on linux2 Type "help", "copyright", "credits" or "license" for more information. >>> from kafka import KafkaProducer >>> import logging >>> logging.basicConfig(level=logging.DEBUG) >>> producer = KafkaProducer(bootstrap_servers='c2175-node4.hwx.com:6667', api_version='0.10', security_protocol='SASL_PLAINTEXT', sasl_mechanism='GSSAPI', sasl_kerberos_service_name='kafka', connections_max_idle_ms=1200000, request_timeout_ms=1000000) DEBUG:kafka.producer.kafka:Starting the Kafka producer WARNING:kafka.producer.kafka:use api_version=(0, 10) [tuple] -- "0.10" as str is deprecated DEBUG:kafka.metrics.metrics:Added sensor with name connections-closed DEBUG:kafka.metrics.metrics:Added sensor with name connections-created DEBUG:kafka.metrics.metrics:Added sensor with name select-time DEBUG:kafka.metrics.metrics:Added sensor with name io-time DEBUG:kafka.metrics.metrics:Added sensor with name bufferpool-wait-time DEBUG:kafka.metrics.metrics:Added sensor with name batch-size DEBUG:kafka.metrics.metrics:Added sensor with name compression-rate DEBUG:kafka.metrics.metrics:Added sensor with name queue-time DEBUG:kafka.metrics.metrics:Added sensor with name produce-throttle-time DEBUG:kafka.metrics.metrics:Added sensor with name records-per-request DEBUG:kafka.metrics.metrics:Added sensor with name bytes DEBUG:kafka.metrics.metrics:Added sensor with name record-retries DEBUG:kafka.metrics.metrics:Added sensor with name errors DEBUG:kafka.metrics.metrics:Added sensor with name record-size-max DEBUG:kafka.producer.kafka:Kafka producer started DEBUG:kafka.producer.sender:Starting Kafka producer I/O thread. >>> DEBUG:kafka.client:Initializing connection to node bootstrap for metadata request DEBUG:kafka.client:Initiating connection to node bootstrap at 172.25.38.138:6667 DEBUG:kafka.metrics.metrics:Added sensor with name bytes-sent-received DEBUG:kafka.metrics.metrics:Added sensor with name bytes-sent DEBUG:kafka.metrics.metrics:Added sensor with name bytes-received DEBUG:kafka.metrics.metrics:Added sensor with name request-latency DEBUG:kafka.metrics.metrics:Added sensor with name node-bootstrap.bytes-sent DEBUG:kafka.metrics.metrics:Added sensor with name node-bootstrap.bytes-received DEBUG:kafka.metrics.metrics:Added sensor with name node-bootstrap.latency DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap host=172.25.38.138:6667 <disconnected> [IPv4 None]>: creating new socket DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap host=172.25.38.138:6667 <disconnected> [IPv4 ('172.25.38.138', 6667)]>: setting socket option (6, 1, 1) INFO:kafka.conn:<BrokerConnection node_id=bootstrap host=172.25.38.138:6667 <connecting> [IPv4 ('172.25.38.138', 6667)]>: connecting to 172.25.38.138:6667 [('172.25.38.138', 6667) IPv4] DEBUG:kafka.client:Give up sending metadata request since no node is available DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap host=172.25.38.138:6667 <connecting> [IPv4 ('172.25.38.138', 6667)]>: established TCP connection DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap host=172.25.38.138:6667 <connecting> [IPv4 ('172.25.38.138', 6667)]>: initiating SASL authentication DEBUG:kafka.protocol.parser:Sending request SaslHandShakeRequest_v0(mechanism='GSSAPI') DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap host=172.25.38.138:6667 <authenticating> [IPv4 ('172.25.38.138', 6667)]> Request 1: SaslHandShakeRequest_v0(mechanism='GSSAPI') DEBUG:kafka.client:Give up sending metadata request since no node is available DEBUG:kafka.protocol.parser:Received correlation id: 1 DEBUG:kafka.protocol.parser:Processing response SaslHandShakeResponse_v0 DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap host=172.25.38.138:6667 <authenticating> [IPv4 ('172.25.38.138', 6667)]> Response 1 (1.20496749878 ms): SaslHandShakeResponse_v0(error_code=0, enabled_mechanisms=[u'GSSAPI']) DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap host=172.25.38.138:6667 <authenticating> [IPv4 ('172.25.38.138', 6667)]>: GSSAPI name: kafka/c2175-node4.hwx.com@HWX.COM INFO:kafka.conn:<BrokerConnection node_id=bootstrap host=172.25.38.138:6667 <authenticating> [IPv4 ('172.25.38.138', 6667)]>: Authenticated as kafka/c2175-node4.hwx.com@HWX.COM via GSSAPI INFO:kafka.conn:<BrokerConnection node_id=bootstrap host=172.25.38.138:6667 <authenticating> [IPv4 ('172.25.38.138', 6667)]>: Connection complete. DEBUG:kafka.client:Node bootstrap connected DEBUG:kafka.client:Give up sending metadata request since no node is available DEBUG:kafka.client:Sending metadata request MetadataRequest_v1(topics=NULL) to node bootstrap DEBUG:kafka.protocol.parser:Sending request MetadataRequest_v1(topics=NULL) DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap host=172.25.38.138:6667 <connected> [IPv4 ('172.25.38.138', 6667)]> Request 2: MetadataRequest_v1(topics=NULL) DEBUG:kafka.protocol.parser:Received correlation id: 2 DEBUG:kafka.protocol.parser:Processing response MetadataResponse_v1 DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap host=172.25.38.138:6667 <connected> [IPv4 ('172.25.38.138', 6667)]> Response 2 (4.12797927856 ms): MetadataResponse_v1(brokers=[(node_id=1001, host=u'c2175-node4.hwx.com', port=6667, rack=None)], controller_id=1001, topics=[(error_code=0, topic=u'test3', is_internal=False, partitions=[(error_code=0, partition=0, leader=1001, replicas=[1001], isr=[1001])]), (error_code=0, topic=u'__consumer_offsets', is_internal=True, partitions=[(error_code=0, partition=23, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=41, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=32, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=8, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=17, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=44, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=35, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=26, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=11, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=29, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=38, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=47, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=20, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=2, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=5, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=14, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=46, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=49, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=40, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=4, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=13, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=22, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=31, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=16, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=7, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=43, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=25, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=34, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=10, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=37, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=1, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=19, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=28, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=45, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=36, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=27, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=9, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=18, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=21, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=48, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=12, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=3, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=30, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=39, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=15, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=42, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=24, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=33, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=6, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=0, leader=1001, replicas=[1001], isr=[1001])]), (error_code=0, topic=u'ambari_kafka_service_check', is_internal=False, partitions=[(error_code=0, partition=0, leader=1001, replicas=[1001], isr=[1001])]), (error_code=0, topic=u'foobar', is_internal=False, partitions=[(error_code=0, partition=0, leader=1001, replicas=[1001], isr=[1001])]), (error_code=0, topic=u'new_topic', is_internal=False, partitions=[(error_code=0, partition=0, leader=1001, replicas=[1001], isr=[1001])])]) DEBUG:kafka.cluster:Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 5, groups: 0) >>> for item in range(10): ... producer.send('new_topic', b'Happy '+ str(item)+' Days!') ... DEBUG:kafka.producer.kafka:Sending (key=None value='Happy 0 Days!' headers=[]) to TopicPartition(topic='new_topic', partition=0) DEBUG:kafka.producer.record_accumulator:Allocating a new 16384 byte message buffer for TopicPartition(topic='new_topic', partition=0) DEBUG:kafka.producer.kafka:Waking up the sender since TopicPartition(topic='new_topic', partition=0) is either full or getting a new batch <kafka.producer.future.FutureRecordMetadata object at 0x7f5fc2c99410> DEBUG:kafka.producer.kafka:Sending (key=None value='Happy 1 Days!' headers=[]) to TopicPartition(topic='new_topic', partition=0)
Similarly, here is what a simple consumer in kafka-python 1.4.5 on an HDP 3.1 kerberized cluster looks like;
from kafka import KafkaConsumer import logging logging.basicConfig(level=logging.DEBUG) consumer = KafkaConsumer('new_topic', bootstrap_servers='c2175-node4.hwx.com:6667', api_version='0.10', security_protocol='SASL_PLAINTEXT', sasl_mechanism='GSSAPI', sasl_kerberos_service_name='kafka', connections_max_idle_ms=1200000, request_timeout_ms=1000000) for msg in consumer: print (msg)
===============================================
If you are running Centos 7.x and encountering the following during the install of gssapi:
Exception: Could not find main GSSAPI shared library. Please try setting GSSAPI_MAIN_LIB yourself or setting ENABLE_SUPPORT_DETECTION to 'false'
This error is thrown because /bin/krb5-config is missing. To satisfy this requirement:
yum install -y krb5-devel
===============================================
Special thanks to Akshay Mankumbare
Created on 03-29-2019 06:26 PM
Hello,
When I try the following tutorial when I launch my KafkaProducer I got the following error : "AttributeError: 'BrokerConnection' object has no attribute '_sock'".
Created on 04-08-2019 12:53 PM
Sounds like maybe an issue from the api_version used. Are you trying this out on HDP 3.1 or a different HDP or HDF version?
Could you share the full error stack?
Created on 04-20-2020 03:07 PM
Hi All,
I am trying to connect from Local machine to kafka cluster(kerberized Cluster) through python. Can anyone help what are the properties to specify for the krb5.conf file and other properties.
your help is appreciated.