Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

How to authenticate (through Kerborized cluster) to kafka using python.

avatar
Expert Contributor

Hi,

 

I am trying to connect Kafka from my local machine to kafka kerberized cluster using python, but i am connect with below credentials. Could any guide me and you help is appreciated.

consumer = KafkaConsumer('test',bootstrap_servers='XXX:1234',
   #client_id= kafka-python- + __version__,
   request_timeout_ms=30000,
   connections_max_idle_ms=9 * 60 * 1000,
   reconnect_backoff_ms=50,
   reconnect_backoff_max_ms=1000,
   max_in_flight_requests_per_connection=5,
   receive_buffer_bytes=None,
   send_buffer_bytes=None,
   #socket_options= [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
   sock_chunk_bytes=4096, # undocumented experimental option
   sock_chunk_buffer_count=1000, # undocumented experimental option
   retry_backoff_ms=100,
   metadata_max_age_ms=300000,
   security_protocol='SASL_SSL',
   ssl_context=None,
   ssl_check_hostname=True,
   ssl_cafile=None,
   ssl_certfile=None,
   ssl_keyfile=None,
   ssl_password=None,
   ssl_crlfile=None,
   api_version=None,
   api_version_auto_timeout_ms=2000,
   #selector=selectors.DefaultSelector,
   sasl_mechanism='GSSAPI',
   #sasl_plain_username= None,
   #sasl_plain_password='XXX',
   sasl_kerberos_service_name='XXX',
   # metrics configs
   metric_reporters=[],
   metrics_num_samples=2,
   metrics_sample_window_ms=30000)

for msg in consumer:
    print(msg)

 

Please guide and you help is appreciated.

 

Thanks

1 REPLY 1

avatar
Expert Contributor

Traceback (most recent call last):

  File "consumer.py", line 8, in <module>

    consumer = KafkaConsumer('test',

  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site-packages/kafka/consumer/group.py", line 355, in __init__

    self._client = KafkaClient(metrics=self._metrics, **self.config)

  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site-packages/kafka/client_async.py", line 242, in __init__

    self.config['api_version'] = self.check_version(timeout=check_timeout)

  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site-packages/kafka/client_async.py", line 907, in check_version

    version = conn.check_version(timeout=remaining, strict=strict, topics=list(self.config['bootstrap_topics_filter']))

  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site-packages/kafka/conn.py", line 1228, in check_version

    if not self.connect_blocking(timeout_at - time.time()):

  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site-packages/kafka/conn.py", line 337, in connect_blocking

    self.connect()

  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site-packages/kafka/conn.py", line 426, in connect

    if self._try_handshake():

  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site-packages/kafka/conn.py", line 505, in _try_handshake

    self._sock.do_handshake()

  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/ssl.py", line 1309, in do_handshake

    self._sslobj.do_handshake()

ssl.SSLCertVerificationError: [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: self signed certificate in certificate chain (_ssl.c:1108)

 

I am getting above error after running the program, Any inputs ?