Support Questions
Find answers, ask questions, and share your expertise

How to authenticate (through Kerborized cluster) to kafka using python.

Rising Star

Hi,

 

I am trying to connect Kafka from my local machine to kafka kerberized cluster using python, but i am connect with below credentials. Could any guide me and you help is appreciated.

consumer = KafkaConsumer('test',bootstrap_servers='XXX:1234',
   #client_id= kafka-python- + __version__,
   request_timeout_ms=30000,
   connections_max_idle_ms=9 * 60 * 1000,
   reconnect_backoff_ms=50,
   reconnect_backoff_max_ms=1000,
   max_in_flight_requests_per_connection=5,
   receive_buffer_bytes=None,
   send_buffer_bytes=None,
   #socket_options= [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
   sock_chunk_bytes=4096, # undocumented experimental option
   sock_chunk_buffer_count=1000, # undocumented experimental option
   retry_backoff_ms=100,
   metadata_max_age_ms=300000,
   security_protocol='SASL_SSL',
   ssl_context=None,
   ssl_check_hostname=True,
   ssl_cafile=None,
   ssl_certfile=None,
   ssl_keyfile=None,
   ssl_password=None,
   ssl_crlfile=None,
   api_version=None,
   api_version_auto_timeout_ms=2000,
   #selector=selectors.DefaultSelector,
   sasl_mechanism='GSSAPI',
   #sasl_plain_username= None,
   #sasl_plain_password='XXX',
   sasl_kerberos_service_name='XXX',
   # metrics configs
   metric_reporters=[],
   metrics_num_samples=2,
   metrics_sample_window_ms=30000)

for msg in consumer:
    print(msg)

 

Please guide and you help is appreciated.

 

Thanks

1 REPLY 1

Rising Star

Traceback (most recent call last):

  File "consumer.py", line 8, in <module>

    consumer = KafkaConsumer('test',

  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site-packages/kafka/consumer/group.py", line 355, in __init__

    self._client = KafkaClient(metrics=self._metrics, **self.config)

  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site-packages/kafka/client_async.py", line 242, in __init__

    self.config['api_version'] = self.check_version(timeout=check_timeout)

  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site-packages/kafka/client_async.py", line 907, in check_version

    version = conn.check_version(timeout=remaining, strict=strict, topics=list(self.config['bootstrap_topics_filter']))

  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site-packages/kafka/conn.py", line 1228, in check_version

    if not self.connect_blocking(timeout_at - time.time()):

  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site-packages/kafka/conn.py", line 337, in connect_blocking

    self.connect()

  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site-packages/kafka/conn.py", line 426, in connect

    if self._try_handshake():

  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site-packages/kafka/conn.py", line 505, in _try_handshake

    self._sock.do_handshake()

  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/ssl.py", line 1309, in do_handshake

    self._sslobj.do_handshake()

ssl.SSLCertVerificationError: [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: self signed certificate in certificate chain (_ssl.c:1108)

 

I am getting above error after running the program, Any inputs ?