Support Questions

Find answers, ask questions, and share your expertise
Announcements
Check out our newest addition to the community, the Cloudera Data Analytics (CDA) group hub.

Kafka-python issues in kerberized HDP 2.5

Expert Contributor

We are using kafka-python package v1.3.3

HDP: 2.5 (Kerberized). Kafka (0.10) - kerberized (PLAINTEXTSASL)

Command line utils are working properly (topic, consumer, producer).

Scala (thru spark shell) can produce messages into topics using KafkaProducer.

The problem is with Python:

pyspark \
 --master local \
 --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=ke.conf" 
from kafka import KafkaProducer 
from kafka import KafkaClient
from kafka import KafkaConsumer

kafk_security_protocol='PLAINTEXTSASL'
#kafk_security_protocol='SASL_PLAINTEXTSASL'
broker_list =  'broker1:6667,broker2:6667'

producer = KafkaProducer(
                        bootstrap_servers = broker_list        ,
                        api_version_auto_timeout_ms = 10000      ,
                        security_protocol = kafk_security_protocol
            )
Traceback (most recent call last):
  File "<stdin>", line 4, in <module>
  File "/usr/lib/python2.7/site-packages/kafka/producer/kafka.py", line 335, in __init__
    **self.config)
  File "/usr/lib/python2.7/site-packages/kafka/client_async.py", line 210, in __init__
    self.config['api_version'] = self.check_version(timeout=check_timeout)
  File "/usr/lib/python2.7/site-packages/kafka/client_async.py", line 808, in check_version
    raise Errors.NoBrokersAvailable()
kafka.errors.NoBrokersAvailable: NoBrokersAvailable

# ----- adding specific version:
producer = KafkaProducer(
                        bootstrap_servers = broker_list        ,
                        api_version_auto_timeout_ms = 10000      ,
                        security_protocol = kafk_security_protocol      ,
                        api_version = (0,10)
            )
# --- no failure, producer created, but....
producer.send (topic = 'test_me', partition=0, value = 'HELLO')

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/lib/python2.7/site-packages/kafka/producer/kafka.py", line 492, in send
    self._wait_on_metadata(topic, self.config['max_block_ms'] / 1000.0)
  File "/usr/lib/python2.7/site-packages/kafka/producer/kafka.py", line 615, in _wait_on_metadata
    "Failed to update metadata after %s secs.", max_wait)
kafka.errors.KafkaTimeoutError: KafkaTimeoutError: ('Failed to update metadata after %s secs.', 60.0)

If I remove security protocol, result is the same, so it doesn't look having any effect.

Is there a way to connect to Kafka from Python?

Thanks!

4 REPLIES 4

Rising Star

Hi @Ed Berezitsky did you find a solution for this issue, i having a similar situation.

Expert Contributor

PLAINTEXTSASL not supported in pyspark on HDP 2.5 (Kafka 0.10). So I did a workaround with HDF instead of HDP side spark.

Hi @Ed Berezitsky,

Try changing broker_list to a list instead of string

broker_list = [ 'broker1:6667', 'broker2:6667']

Thanks,

Aditya

Expert Contributor

it can connect to brokers, the problem is SASL protocols aren't supported in kafka producer.

Take a Tour of the Community
Don't have an account?
Your experience may be limited. Sign in to explore more.