Community Articles

Find and share helpful community-sourced technical articles.
avatar
Cloudera Employee

In this article, we’ll walk through the steps required to connect a Spark Structured Streaming application to Kafka in CDP Data Engineering Experience (DEX). This article extends the official CDP documentation: Connecting Kafka clients to Data Hub provisioned clusters, to include Spark applications run in Cloudera Data Experience.

 

Steps

  1. Obtain the FreeIPA certificate of your environment:
    1. From the CDP home page, navigate to Management Console > Environments
    2. Locate and select your environment from the list of available environments
    3. Click Actions
    4. Select Get FreeIPA Certificate from the drop-down menu. The FreeIPA certificate downloads.

  2. Add the FreeIPA certificate to a Java keystore: All clients that you want to connect to the Data Hub provisioned cluster, will need to use the certificate to communicate over TLS. The exact steps of adding the certificate to the truststore depends on the platform and key management software used. For example, you can use the Java keytool command line tool:

    keytool -import -keystore [CLIENT_TRUSTSTORE.JKS] -alias [ALIAS] -file [FREEIPA_CERT]
    ​

    In this case, we will replace [CLIENT_TRUSTSTORE.JKS] with keystore.jks, since we want to name our newly created keystore file keystore.jks.

  3. Obtain CDP workload credentials: A valid workload username and password have to be provided to the client, otherwise, it cannot connect to the cluster. Credentials can be obtained from the Management Console.
    1. From the CDP Home Page, navigate to Management Console > User Management.
    2. Locate and select the user account you want to use from the list of available accounts. The user details page displays information about the user.
    3. Find the username found in the Workload Username entry and note it down.
    4. Find the Workload Password entry and click Set Workload Password.
    5. In the dialog box that appears, enter a new workload password, confirm the password and note it down.
    6. Fill out the Environment text box.
    7. Click Set Workload Password and wait for the process to finish.
    8. Click Close.

  4. Create a pyspark script to stream from a Kafka topic
    script.py:
    from pyspark.sql import SparkSession
    from pyspark.sql.types import Row, StructField, StructType, StringType, IntegerType
    
    spark = SparkSession\
        .builder\
        .appName("PythonSQL")\
        .getOrCreate()
    
    df = spark \
      .readStream \
      .format("kafka") \
      .option("kafka.bootstrap.servers", "kafka_broker_hostname:9093") \
      .option("subscribe", "yourtopic") \
      .option("kafka.security.protocol", "SASL_SSL") \
      .option("kafka.sasl.mechanism", "PLAIN") \
      .option("kafka.ssl.truststore.location", "/app/mount/keystore.jks") \
      .option("kafka.ssl.truststore.password", "mypassword") \
      .option("kafka.sasl.jaas.config", 'org.apache.kafka.common.security.plain.PlainLoginModule required username="yourusername" password="mypassword";') \
      .load()
    df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    
    
    query = df.writeStream \
      .outputMode("append") \
      .format("console") \
      .start() \
      
    query.awaitTermination()
    Note: In the above code, we have specified that our keystore location as /app/mount/keystore.jks, in our option (kafka.ssl.truststore.location). This is because when we upload our keystore.jks file to Cloudera data experience later, it will be uploaded to the /app/mount directory. 

    Note: We have specified our keystore password in the kafka.ssl.truststore.password option.  This was the password that we provided for our keystore at the time of its creation.   

    Finally, note that we have specified our workload username and password in the kafka.sasl.jaas.config option.  

  5. Create a Cloudera Data Engineering Experience Job 
    1. From the CDP Home Page navigate to CDP Cloudera Data Engineering > Jobs  > Create Job  
    2. Upload the script.py file to Cloudera Data Engineering Experience (via the Job Details > Upload File Textbox), and the keystore.jks file (via Advanced Options > Other Dependencies Textbox)

      DEX - Files - Annotated.png

    3. Name the job using the Job details > Name Textbox
    4. Unselect Schedule
    5. Click Run now

  6. Validate that the Job is running successfully
    1. Navigate to CDP Cloudera Data Engineering > Jobs Runs 
    2. Drill down into the Run ID/ Job 

      DEX - Job Runs.PNG

    3. Navigate to Actions > Logs

      DEX - Job Run.PNG

    4. Select the stdout tab

      DEX - Job Stdout.PNG|

2,954 Views