Community Articles
Find and share helpful community-sourced technical articles
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.
Labels (1)
Super Guru

Over the last fews weeks as customers have started to ramp up their usage of CDP cloud assets (like DataHub, Experiences, etc), I have observed many of the ways they are leveraging on-prem engineering assets (code) in the cloud.  The concept of write-once-deploy-anywhere is fundamental to a well designed data strategy.  It's NOT a sales pitch.  It's a reality for enterprises who have invested in a Modern Data Architecture.  However, unlike on-prem where storage and services are tightly coupled, CDP flips that concept on its head.  We can now launch services independently and choose only the capabilities we need for the task at hand.  For example, streaming use cases typically require NiFi, Kafka, and Spark Streaming.  Each of those services would be separate DataHub clusters and scale independently.  This article focuses on using PySpark to read from a secured Kafka instance.  To be clear, this is one way (not the only way) of using PySpark to read from Kafka.  Both (DE & SM) clusters are launched via CDP control plane.  


CDP DataHub assets used in this article

  • Data Engineering Cluster (DE)
  • Streams Messaging (SM)


Launching DataHub DE and SM clusters are well documented here.






ssl.truststore.password=password required username="machine-user" password="password;




  • Create a Kafka jaas file called jaas.conf (this can be named whatever you like)




KafkaClient { required





Create a Kafka topic

The easiest way to create a Kafka topic is via SMM (Streamings Messaging Manager) which is shipped with Streams Messaging cluster.  Click on the SMM URL within DataHub and the click on "Topics" located on the right menu bar.   Click on "Add New" to create a new Kafka topic.



Enter the topic name "demo", set partitions to 1, and clean up policy to "delete".



The demo topic should now be available.


Generate Data

For PySpark to consume from a secured instance of Kafka, we need the ability to write to Kafka.  Here we will use Kafka console. 

  • SSH into one of the broker nodes
  • Update k* with your broker list and ports
  • Upload (created early) onto this node
  • Update the location of your file

After the below command is executed, you can start to write data (messages) to Kafka.  We will come back to this in a moment.




kafka-console-producer --broker-list,, --producer.config /home/c
sunilemanjee/ --topic demo





Read from Kafka using PySpark

  • SSH into any node within the DE cluster
  • Uploaded jaas.conf and kafka.client.truststore
  • Update the location of jaas.conf and kafka.client.truststore
  • Launch PySpark shell using the following command





pyspark --files "/home/csunilemanjee/jaas.conf#jaas.conf,/home/sunilemanjee/kafka.client.truststore.jks#kafka.client.truststore.jks" --driver-java-options "" --conf ""





Once PySpark shell is up, it may be easier to store the Kafka brokers in a variable like this:








Create a  structured stream to read from Kafka.  Update the following

  • kafka.ssl.truststore.location
  • kafka.ssl.truststore.password
  • username
  • password




df_kafka = spark.readStream.format("kafka").option("kafka.bootstrap.servers", KAFKA_BROKERS).option("subscribe", "demo").option("", "SASL_SSL").option("kafka.sasl.mechanism", "PLAIN").option("kafka.ssl.truststore.location", "./kafka.client.truststore.jks").option("kafka.ssl.truststore.password", "password").option("kafka.sasl.jaas.config", " required username=\"machine-user\" password=\"password\"serviceName=\"kafka\";").load().selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").writeStream.format("console").trigger(continuous="1 second").start()




To start viewing Kafka message on the console (pyspark shell) from the "demo" topic




stream = df_kafka.writeStream.format("console").start()

##once you are finished, to kill the stream run this




Go back to your kafka console and start write messaging (anything you like).  You will see those messages show up in your PySpark Shell console.


That's it.  Again this is one (not the only way) to use PySpark to consume from a secured Kafka instance.  I see as an emerging pattern in the CDP for streaming use cases.  Enjoy.



0 Kudos
Don't have an account?
Coming from Hortonworks? Activate your account here
Version history
Revision #:
11 of 11
Last update:
‎01-15-2020 01:29 PM
Updated by:
Top Kudoed Authors