Created on 01-07-2021 01:20 PM - edited on 01-10-2021 07:42 PM by subratadas
In this article, we will walk through the steps required to connect a Spark Structured Streaming application to Kafka in CDP Data Hub. We use two Data Hubs, one with a Data Engineering Template, and another with a Streams Messaging template. Both Data Hubs were created in the same environment.
This article extends the official CDP documentation: Connecting Kafka clients to Data Hub provisioned clusters to include Spark applications.
Steps
1. Obtain the FreeIPA certificate of your environment:
2. Add the FreeIPA certificate to the truststore of the client.
The certificate needs to be added for all clients that you want to connect to the Data Hub provisioned cluster. The exact steps of adding the certificate to the truststore depends on the platform and key management software used. For example, you can use the Java keytool command line tool:
keytool -import -keystore [CLIENT_TRUSTSTORE.JKS] -alias [ALIAS] -file [FREEIPA_CERT]
3. Obtain CDP workload credentials:
A valid workload username and password has to be provided to the client, otherwise it cannot connect to the cluster. Credentials can be obtained from Management Console.
4. Create a pyspark script to connect to Kafka
script.py
from pyspark.sql import SparkSession
from pyspark.sql.types import Row, StructField, StructType, StringType, IntegerType
spark = SparkSession\
.builder\
.appName("PythonSQL")\
.getOrCreate()
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka_broker_hostname:9093") \
.option("subscribe", "yourtopic") \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.mechanism", "PLAIN") \
.option("kafka.ssl.truststore.location", "/path/to/keystore.jks") \
.option("kafka.ssl.truststore.password", "mypassword") \
.option("kafka.sasl.jaas.config", 'org.apache.kafka.common.security.plain.PlainLoginModule required username="yourusername" password="mypassword";') \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
query = df.writeStream \
.outputMode("append") \
.format("console") \
.start() \
query.awaitTermination()
Note, in the above code, we have specified our keystore location in our option (kafka.ssl.truststore.location), and our keystore password in the kafka.ssl.truststore.password option. The password we provide here is the password that we provided for our keystore at the time of its creation.
Note: We have specified our workload username and password in the "kafka.sasl.jaas.config" option.
5. Kinit as a user with permissions to the Kafka topic
kinit username
6. Submit the spark job
To run the job on a local machine
spark-submit
--master=local
./script.py
To run the job on the YARN cluster
spark-submit --master=yarn \
--files "mykeystore.jks" \
--executor-cores 1 \
./script.py
7. Validate that the Job is running successfully
From the CDP Home Page, navigate to Data Hub Clusters > (Drill down to the Data Engineering Data Hub) > Resource Manager > Applications > (Drill down to the stdout logs for your Job)