Support Questions
Find answers, ask questions, and share your expertise
Announcements
Check out our newest addition to the community, the Cloudera Innovation Accelerator group hub.

How to authenticate (through Kerborized cluster) to kafka using python.

Rising Star

Hi,

 

I am trying to connect Kafka from my local machine to kafka kerberized cluster using python, but i am connect with below credentials. Could any guide me and you help is appreciated.

consumer = KafkaConsumer('test',bootstrap_servers='XXX:1234',
   #client_id= kafka-python- + __version__,
   request_timeout_ms=30000,
   connections_max_idle_ms=9 * 60 * 1000,
   reconnect_backoff_ms=50,
   reconnect_backoff_max_ms=1000,
   max_in_flight_requests_per_connection=5,
   receive_buffer_bytes=None,
   send_buffer_bytes=None,
   #socket_options= [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
   sock_chunk_bytes=4096, # undocumented experimental option
   sock_chunk_buffer_count=1000, # undocumented experimental option
   retry_backoff_ms=100,
   metadata_max_age_ms=300000,
   security_protocol='SASL_SSL',
   ssl_context=None,
   ssl_check_hostname=True,
   ssl_cafile=None,
   ssl_certfile=None,
   ssl_keyfile=None,
   ssl_password=None,
   ssl_crlfile=None,
   api_version=None,
   api_version_auto_timeout_ms=2000,
   #selector=selectors.DefaultSelector,
   sasl_mechanism='GSSAPI',
   #sasl_plain_username= None,
   #sasl_plain_password='XXX',
   sasl_kerberos_service_name='XXX',
   # metrics configs
   metric_reporters=[],
   metrics_num_samples=2,
   metrics_sample_window_ms=30000)

for msg in consumer:
    print(msg)

 

Please guide and you help is appreciated.

 

Thanks

1 REPLY 1

Rising Star

Traceback (most recent call last):

  File "consumer.py", line 8, in <module>

    consumer = KafkaConsumer('test',

  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site-packages/kafka/consumer/group.py", line 355, in __init__

    self._client = KafkaClient(metrics=self._metrics, **self.config)

  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site-packages/kafka/client_async.py", line 242, in __init__

    self.config['api_version'] = self.check_version(timeout=check_timeout)

  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site-packages/kafka/client_async.py", line 907, in check_version

    version = conn.check_version(timeout=remaining, strict=strict, topics=list(self.config['bootstrap_topics_filter']))

  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site-packages/kafka/conn.py", line 1228, in check_version

    if not self.connect_blocking(timeout_at - time.time()):

  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site-packages/kafka/conn.py", line 337, in connect_blocking

    self.connect()

  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site-packages/kafka/conn.py", line 426, in connect

    if self._try_handshake():

  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site-packages/kafka/conn.py", line 505, in _try_handshake

    self._sock.do_handshake()

  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/ssl.py", line 1309, in do_handshake

    self._sslobj.do_handshake()

ssl.SSLCertVerificationError: [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: self signed certificate in certificate chain (_ssl.c:1108)

 

I am getting above error after running the program, Any inputs ?