Created 04-11-2017 04:58 AM
We are using kafka-python package v1.3.3
HDP: 2.5 (Kerberized). Kafka (0.10) - kerberized (PLAINTEXTSASL)
Command line utils are working properly (topic, consumer, producer).
Scala (thru spark shell) can produce messages into topics using KafkaProducer.
The problem is with Python:
pyspark \ --master local \ --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=ke.conf"
from kafka import KafkaProducer from kafka import KafkaClient from kafka import KafkaConsumer kafk_security_protocol='PLAINTEXTSASL' #kafk_security_protocol='SASL_PLAINTEXTSASL' broker_list = 'broker1:6667,broker2:6667' producer = KafkaProducer( bootstrap_servers = broker_list , api_version_auto_timeout_ms = 10000 , security_protocol = kafk_security_protocol ) Traceback (most recent call last): File "<stdin>", line 4, in <module> File "/usr/lib/python2.7/site-packages/kafka/producer/kafka.py", line 335, in __init__ **self.config) File "/usr/lib/python2.7/site-packages/kafka/client_async.py", line 210, in __init__ self.config['api_version'] = self.check_version(timeout=check_timeout) File "/usr/lib/python2.7/site-packages/kafka/client_async.py", line 808, in check_version raise Errors.NoBrokersAvailable() kafka.errors.NoBrokersAvailable: NoBrokersAvailable # ----- adding specific version: producer = KafkaProducer( bootstrap_servers = broker_list , api_version_auto_timeout_ms = 10000 , security_protocol = kafk_security_protocol , api_version = (0,10) ) # --- no failure, producer created, but.... producer.send (topic = 'test_me', partition=0, value = 'HELLO') Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/usr/lib/python2.7/site-packages/kafka/producer/kafka.py", line 492, in send self._wait_on_metadata(topic, self.config['max_block_ms'] / 1000.0) File "/usr/lib/python2.7/site-packages/kafka/producer/kafka.py", line 615, in _wait_on_metadata "Failed to update metadata after %s secs.", max_wait) kafka.errors.KafkaTimeoutError: KafkaTimeoutError: ('Failed to update metadata after %s secs.', 60.0)
If I remove security protocol, result is the same, so it doesn't look having any effect.
Is there a way to connect to Kafka from Python?
Thanks!
Created 09-14-2017 10:39 AM
Hi @Ed Berezitsky did you find a solution for this issue, i having a similar situation.
Created 09-22-2017 04:20 PM
PLAINTEXTSASL not supported in pyspark on HDP 2.5 (Kafka 0.10). So I did a workaround with HDF instead of HDP side spark.
Created 09-22-2017 12:07 PM
Hi @Ed Berezitsky,
Try changing broker_list to a list instead of string
broker_list = [ 'broker1:6667', 'broker2:6667']
Thanks,
Aditya
Created 09-22-2017 04:22 PM
it can connect to brokers, the problem is SASL protocols aren't supported in kafka producer.