Community Articles
Find and share helpful community-sourced technical articles.
Cloudera Employee

In this article, we will walk through the steps required to connect a Spark Structured Streaming application to Kafka in CDP Data Hub. We use two Data Hubs, one with a Data Engineering Template, and another with a Streams Messaging template. Both Data Hubs were created in the same environment.

 

This article extends the official CDP documentation: Connecting Kafka clients to Data Hub provisioned clusters to include Spark applications.

 

Steps

1. Obtain the FreeIPA certificate of your environment:

    1. From the CDP Home Page, navigate to Management Console > Environments
    2. Locate and select your environment from the list of available environments
    3. Click Actions
    4. Select Get FreeIPA Certificate from the drop-down menu. The FreeIPA certificate downloads.

2. Add the FreeIPA certificate to the truststore of the client.

The certificate needs to be added for all clients that you want to connect to the Data Hub provisioned cluster. The exact steps of adding the certificate to the truststore depends on the platform and key management software used. For example, you can use the Java keytool command line tool:

 

keytool -import -keystore [CLIENT_TRUSTSTORE.JKS] -alias [ALIAS] -file [FREEIPA_CERT]

 

3. Obtain CDP workload credentials:

A valid workload username and password has to be provided to the client, otherwise it cannot connect to the cluster. Credentials can be obtained from Management Console.

  1. From the CDP Home Page, navigate to Management Console > User Management
  2. Locate and select the user account you want to use from the list of available accounts. (The user details page displays information about the user.)
  3. Find the username found in the Workload Username entry and note it down
  4. Find the Workload Password entry and click Set Workload Password
  5. In the dialog box that appears, enter a new workload password, confirm the password and note it down
  6. Fill out the Environment text box
  7. Click Set Workload Password and wait for the process to finish
  8. Click Close

4. Create a pyspark script to connect to Kafka

script.py

 

from pyspark.sql import SparkSession
from pyspark.sql.types import Row, StructField, StructType, StringType, IntegerType

spark = SparkSession\
    .builder\
    .appName("PythonSQL")\
    .getOrCreate()

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "kafka_broker_hostname:9093") \
  .option("subscribe", "yourtopic") \
  .option("kafka.security.protocol", "SASL_SSL") \
  .option("kafka.sasl.mechanism", "PLAIN") \
  .option("kafka.ssl.truststore.location", "/path/to/keystore.jks") \
  .option("kafka.ssl.truststore.password", "mypassword") \
  .option("kafka.sasl.jaas.config", 'org.apache.kafka.common.security.plain.PlainLoginModule required username="yourusername" password="mypassword";') \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")


query = df.writeStream \
  .outputMode("append") \
  .format("console") \
  .start() \
  
query.awaitTermination()

 

Note, in the above code, we have specified our keystore location in our option (kafka.ssl.truststore.location), and our keystore password in the kafka.ssl.truststore.password option. The password we provide here is the password that we provided for our keystore at the time of its creation.

Note: We have specified our workload username and password in the "kafka.sasl.jaas.config" option.  

 

5. Kinit as a user with permissions to the Kafka topic

 

kinit username

 

6. Submit the spark job

To run the job on a local machine

 

spark-submit 
--master=local   
./script.py

 

To run the job on the YARN cluster

 

spark-submit 	--master=yarn   \
		       --files "mykeystore.jks" \
    		    --executor-cores 1 \
		         ./script.py

 

7. Validate that the Job is running successfully

From the CDP Home Page, navigate to Data Hub Clusters > (Drill down to the Data Engineering Data Hub) > Resource Manager > Applications > (Drill down to the stdout logs for your Job)

543 Views
0 Kudos
Don't have an account?
Version history
Last update:
‎01-10-2021 07:42 PM
Updated by:
Contributors
Top Kudoed Authors