Community Articles

Find and share helpful community-sourced technical articles.
Announcements
Celebrating as our community reaches 100,000 members! Thank you!
avatar
Master Guru

2020-11-09_12-59-23.jpg

 

Recently I ran into a scenario requiring to connect my Spark Intellij IDE to Kafka DataHub.   I'm not going to claim the status of a pro at IDE secure setup.  Therefore for novices in the security realm alike, they may find this article useful

 

This article will go through steps setting up an Spark Scala IDE (Intellij)  (with a supplied working code example) to connect securely to a Kafka DataHub over SASL_SSL protocol using PLAIN SASL mechanism.

Artifacts

Prequequites 

  • Kafka DataHub Instances
  • Permission setup on Ranger to be able to read/write from Kafka
  • Intellij (or similar) with the Scala plugin installed
  • Workload username and password

TrustStore

Andre Sousa Dantas De Araujo did a great job explaining (very simply) how get the certificate from CDP and create a truststore. Just a few simple steps here

I stored it here on my local machine which is referenced in the spark scala code

 

 

./src/main/resources/truststore.jks

 

JaaS Setup

Create a jaas.conf file

 

KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="YOUR-WORKLOAD-USER"
password="YOUR-WORKLOAD-PASSWORD";
};

 

I stored mine here which is referenced in the spark scala code

 

./src/main/resources/jaas.conf

 

Spark Session (Scala Code)

  • Master is set to local
  • set spark.driver.extraJavaOptions and spark.executor.extraJavaOptions to the location of your jaas.conf
  • set spark.kafka.ssl.truststore.location to the location of your truststore

 

    val spark = SparkSession.builder
      .appName("Spark Kafka Secure Structured Streaming Example")
      .master("local")
      .config("spark.kafka.bootstrap.servers", kbrokers)
      .config("spark.kafka.sasl.kerberos.service.name", "kafka")
      .config("spark.kafka.security.protocol", "SASL_SSL")
      .config("kafka.sasl.mechanism", "PLAIN")
      .config("spark.driver.extraJavaOptions", "-Djava.security.auth.login.config=./src/main/resources/jaas.conf")
      .config("spark.executor.extraJavaOptions", "-Djava.security.auth.login.config=./src/main/resources/jaas.conf")
      .config("spark.kafka.ssl.truststore.location", "./src/main/resources/truststore.jks")
      .getOrCreate()

 

Write to Kafka

The data in the dataframe is hydrated via csv file.  Here I will simply read the dataframe and write it back out to a Kafka topic

 

 

    val ds = streamingDataFrame.selectExpr("CAST(id AS STRING)", "CAST(text AS STRING) as value")
      .writeStream.format("kafka")
      .outputMode("update")
      .option("kafka.bootstrap.servers", kbrokers)
      .option("topic", ktargettopic)
      .option("kafka.sasl.kerberos.service.name", "kafka")
      .option("kafka.ssl.truststore.location", "./src/main/resources/truststore.jks")
      .option("kafka.security.protocol", "SASL_SSL")
      .option("kafka.sasl.mechanism", "PLAIN")
      .option("checkpointLocation", "/tmp/spark-checkpoint2/")
      .start()
      .awaitTermination()

 

 

Run

Supply JVM option, provide the location of the jaas.conf

 

-Djava.security.auth.login.config=/PATH-TO-YOUR-jaas.conf

 

 

2020-11-09_15-18-49.jpg

Supply the program arguments.  My code takes 2, kafka topic and Kafka broker(s)

 

sunman my-kafka-broker:9093

 

 

That's it! Run it and enjoy secure SparkStreaming+Kafka glory

2,277 Views