In this article, we’ll walk through the steps required to connect a Spark Structured Streaming application to Kafka in CDP Data Engineering Experience (DEX). This article extends the official CDP documentation: Connecting Kafka clients to Data Hub provisioned clusters, to include Spark applications run in Cloudera Data Experience.
Steps
keytool -import -keystore [CLIENT_TRUSTSTORE.JKS] -alias [ALIAS] -file [FREEIPA_CERT]
from pyspark.sql import SparkSession
from pyspark.sql.types import Row, StructField, StructType, StringType, IntegerType
spark = SparkSession\
.builder\
.appName("PythonSQL")\
.getOrCreate()
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka_broker_hostname:9093") \
.option("subscribe", "yourtopic") \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.mechanism", "PLAIN") \
.option("kafka.ssl.truststore.location", "/app/mount/keystore.jks") \
.option("kafka.ssl.truststore.password", "mypassword") \
.option("kafka.sasl.jaas.config", 'org.apache.kafka.common.security.plain.PlainLoginModule required username="yourusername" password="mypassword";') \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
query = df.writeStream \
.outputMode("append") \
.format("console") \
.start() \
query.awaitTermination()
Note: In the above code, we have specified that our keystore location as /app/mount/keystore.jks, in our option (kafka.ssl.truststore.location). This is because when we upload our keystore.jks file to Cloudera data experience later, it will be uploaded to the /app/mount directory.