Created on 01-15-202011:57 AM - edited on 04-21-202604:33 AM by GrazittiAPI
Over the last fews weeks as customers have started to ramp up their usage of CDP cloud assets (like DataHub, Experiences, etc), I have observed many of the ways they are leveraging on-prem engineering assets (code) in the cloud. The concept of write-once-deploy-anywhere is fundamental to a well designed data strategy. It's NOT a sales pitch. It's a reality for enterprises who have invested in a Modern Data Architecture. However, unlike on-prem where storage and services are tightly coupled, CDP flips that concept on its head. We can now launch services independently and choose only the capabilities we need for the task at hand. For example, streaming use cases typically require NiFi, Kafka, and Spark Streaming. Each of those services would be separate DataHub clusters and scale independently. This article focuses on using PySpark to read from a secured Kafka instance. To be clear, this is one way (not the only way) of using PySpark to read from Kafka. Both (DE & SM) clusters are launched via CDP control plane.
CDP DataHub assets used in this article
Data Engineering Cluster (DE)
Streams Messaging (SM)
Launching DataHub DE and SM clusters are well documented here.
The easiest way to create a Kafka topic is via SMM (Streamings Messaging Manager) which is shipped with Streams Messaging cluster. Click on the SMM URL within DataHub and the click on "Topics" located on the right menu bar. Click on "Add New" to create a new Kafka topic.
Enter the topic name "demo", set partitions to 1, and clean up policy to "delete".
The demo topic should now be available.
Generate Data
For PySpark to consume from a secured instance of Kafka, we need the ability to write to Kafka. Here we will use Kafka console.
SSH into one of the broker nodes
Update k*.cloudera.com:9093 with your broker list and ports
Upload kafka.properties (created early) onto this node
Update the location of your kafka.properties file
After the below command is executed, you can start to write data (messages) to Kafka. We will come back to this in a moment.
Create a structured stream to read from Kafka. Update the following
kafka.ssl.truststore.location
kafka.ssl.truststore.password
username
password
df_kafka = spark.readStream.format("kafka").option("kafka.bootstrap.servers", KAFKA_BROKERS).option("subscribe", "demo").option("kafka.security.protocol", "SASL_SSL").option("kafka.sasl.mechanism", "PLAIN").option("kafka.ssl.truststore.location", "./kafka.client.truststore.jks").option("kafka.ssl.truststore.password", "password").option("kafka.sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"machine-user\" password=\"password\"serviceName=\"kafka\";").load().selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").writeStream.format("console").trigger(continuous="1 second").start()
To start viewing Kafka message on the console (pyspark shell) from the "demo" topic
stream = df_kafka.writeStream.format("console").start()
stream.awaitTermination()
##once you are finished, to kill the stream run this
stream.stop()
Go back to your kafka console and start write messaging (anything you like). You will see those messages show up in your PySpark Shell console.
That's it. Again this is one (not the only way) to use PySpark to consume from a secured Kafka instance. I see as an emerging pattern in the CDP for streaming use cases. Enjoy.