Community Articles

Find and share helpful community-sourced technical articles.
Announcements
Celebrating as our community reaches 100,000 members! Thank you!
avatar
Cloudera Employee

This article is inspired by another article Spark Structured Streaming example with CDE, which talks about how to use CDE to read/write from Kafka using Spark Streaming. The difference in this article is that this article talks about how to connect to "Kafka" of the CDP Streams Messaging data hub cluster.

 

The following are the differences in CDP Streams Messaging data hub cluster: 

  • Kafka of CDP datahub is SASL_SSL enabled.
  • CDE needs the CA certificate of Kafka to connect.

Prerequisites

  • Knowledge of CDP, Nifi, Kafka, and CDE
  • CDE cluster provisioned from CDP Environment
  • CDP Flow Management Datahub Cluster (NiFi) 
  • CDP Streams Messaging Datahub Cluster (Kafka, Schema Registry)
  • CDE CLI

The assumption is that the CDP environment, Data lake, Flow Management, and Streaming Messaging datahub cluster are already created.

Flow Management Datahub Cluster

  1. Once the above clusters are provisioned, setup NiFi to read tweets from twitter and publish it to Kafka.
  2. Use this NiFi template to upload to NiFi and setup the Read Twitter > Push Kafka pipeline:
    Screenshot 2020-09-21 at 4.28.40 PM.png
  3. Edit the KafkaProducer processor and set the Kafka broker to the hostname of Kafka of the SMM cluster.
  4. Edit the GetTwitter processor group with the twitter developer credentials [access key, secret].

Streaming Messaging Manager Datahub Cluster

Nothing to be done  🙂 

CDE Cluster

The overall goal of the Spark job is to read from Kafka topic named "twitter" pushed by NiFi, extract the text from tweet messages, and push to another Kafka topic "tweet". 

 

Note: Spark streaming job needs to talk to Kafka with SSL, this would mean the CA certificate of Kafka needs to be configured in the Spark job. The JRE of the default spark docker image already has the CA certificate of Kafka. The following is the link to the code that points to the same:

https://github.com/bgsanthosh/spark-streaming/blob/master/spark-streaming/src/main/scala/org/cloudera/qe/e2es/TwitterStructureStreaming.scala#L23

 

To configure a CA certificate of Kafka in the Spark job, do the following: 

 

Note: For custom trust-store, the CA certificate of Kafka can be downloaded from Control Plane UI ( "Get FreeIPA Certificate" ) and the docker path to the same can be provided in the job.

Screenshot 2020-09-21 at 4.29.16 PM.png

 

  1. Build the spark streaming application from the following repo.
    https://github.com/bgsanthosh/spark-streaming/tree/master/spark-streaming ​
  2. Create a Virtual Cluster. Check more details see, Creating virtual clusters.
  3. Configure CDE CLI to point to the virtual cluster created in the above step. For more details, see Configuring the CLI client.
  4. Create resources using the following command:
    cde resource create --name spark-streaming​
  5. Upload the Spark job from Step#1 using the following command:
    cde resource upload --name spark-streaming --local-path spark-streaming.jar --resource-path spark-streaming.jar
  6. Create a CDE job definition using the following command:
    cat twitter-streaming.json
    
    {
      "mounts": [
        {
          "resourceName": "spark-streaming"
        }
      ],
      "name": "twitter-streaming",
      "spark": {
        "className": "org.cloudera.qe.e2es.TwitterStructuredStreaming",
        "args": [
        ],
        "driverCores": 1,
        "driverMemory": "1g",
        "executorCores": 1,
        "executorMemory": "1g",
        "file": "spark-streaming.jar",
        "pyFiles": [],
       "files": [],
        "numExecutors": 4
      }
    }
    
    cde job import --file twitter-streaming.json​
  7. Run the Spark Streaming using the following command:
    cde job run --name twitter-streaming​

There you go !!! Spark Streaming on CDE is up and running !!

References

To create a CDP environment, provisioning flow management, streaming datahub, and CDE cluster, see the following documents: 

 

1,107 Views
0 Kudos
Comments
avatar
Contributor

Thanks for the article Santosh. I think we need to update the job definition with kbrokers are argument for the job to run.