Support Questions

Find answers, ask questions, and share your expertise

Scala Spark-Streaming with Kafka Integration in Zeppelin not working

Explorer

Hi Community,

I'm trying to setup a simple example of spark streaming and Kafka integration in Zeppelin without success. Any help will be greatly appreciated.

I am following the Apache documentation and the example provided

Config Details: Ambari managed HDP 2.6 on a 4-node cluster of Spark 2, Kafka, and Hbase

Zeppelin Interpreter dependencies for Spark 2: org.apache.spark:spark-streaming-kafka-0-0_2.11:2.1.1, org.apache.kafka:kafka-clients:0.10.1.1, org.apache.kafka:kafka_2.11:0.10.1.1

I have created 8 messages using the Kafka console producer, such that when I execute the console consumer

./kafka-console-consumer.sh --bootstrap-server vrxhdpkfknod.eastus.cloudapp.azure.com:6667 --topic spark-streaming --from-beginning

I get 8 messages displayed

^CProcessed a total of 8 messages 

When I execute the spark 2 code in Zeppelin,

%spark2
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming._
sc.setLogLevel("ERROR")  // prevent INFO logging from polluting output
val ssc =  StreamingContext.getActiveOrCreate(() => new StreamingContext(sc, Seconds(5)))    // creating the StreamingContext with 5 seconds interval
val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "vrxhdpkfknod.eastus.cloudapp.azure.com:6667",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "kafka-streaming-example",
  "auto.offset.reset" -> "earliest",
  "enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("spark-streaming")
val messages = KafkaUtils.createDirectStream[String, String](
  ssc,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)
messages.foreachRDD { rdd =>
      System.out.println("--- New RDD with " + rdd.partitions.size + " partitions and " + rdd.count + " records")
      rdd.foreach { record =>
        System.out.print(record.value())
      }
    }
ssc.start()

I get

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming._

ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@377213ce
kafkaParams: scala.collection.immutable.Map[String,Object] = Map(key.deserializer -> class org.apache.kafka.common.serialization.StringDeserializer, auto.offset.reset -> earliest, group.id -> kafka-streaming-example, bootstrap.servers -> vrxhdpkfknod.eastus.cloudapp.azure.com:6667, enable.auto.commit -> false, value.deserializer -> class org.apache.kafka.common.serialization.StringDeserializer)
topics: Array[String] = Array(spark-streaming)
messages: org.apache.spark.streaming.dstream.InputDStream[org.apache.kafka.clients.consumer.ConsumerRecord[String,String]] = org.apache.spark.streaming.kafka010.DirectKafkaInputDStream@328d6f9a

There are no error messages. But there is no display of the RDD loop code in Zeppelin. I expect to see all the 8 messages lying in the Kafka Broker displayed in Zeppelin as follows:

--- New RDD with 1 partitions and 8 records
value-1
value-2
value-3
value-4
value-5
value-6
value-7
value-8

What am I doing wrong?

5 REPLIES 5

Contributor

I am aware of you want the answer in Zeppelin but I got it working in spark-shell. Perhaps you can try to convert the steps over.

Preparation
  1. My HDP 2.6 version: HDP_2.6_vmware_19_04_2017_20_25_43_hdp_ambari_2_5_0_5_1
  2. Download spark-streaming-kafka-0-10_2.11-2.1.0.jar from https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10_2.11/2.1.0
  3. ./kafka-topics.sh --create --zookeeper sandbox.hortonworks.com:2181 --replication-factor 1 --partitions 1 --topic wordcount
  4. ./kafka-topics.sh --describe --zookeeper sandbox.hortonworks.com:2181 --topic wordcount
Session 1 (spark-shell: Consumer)
./spark-shell --jars /root/spark/spark-streaming-kafka-0-10_2.11-2.1.0.jar,/usr/hdp/2.6.0.3-8/oozie/share/lib/spark/kafka_2.10-0.10.1.2.6.0.3-8.jar,/usr/hdp/2.6.0.3-8/oozie/share/lib/spark/kafka-clients-0.10.1.2.6.0.3-8.jar

import org.apache.kafka.clients.consumer.ConsumerRecord

import org.apache.kafka.common.serialization.StringDeserializer

import org.apache.spark.streaming.kafka010._

import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent

import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

import org.apache.spark.streaming._

sc.setLogLevel("ERROR") // prevent INFO logging from polluting output

val ssc = StreamingContext.getActiveOrCreate(() => new StreamingContext(sc, Seconds(5))) // creating the StreamingContext with 5 seconds interval

val kafkaParams = Map[String, Object](

"bootstrap.servers" -> "sandbox.hortonworks.com:6667",

"key.deserializer" -> classOf[StringDeserializer],

"value.deserializer" -> classOf[StringDeserializer],

"group.id" -> "kafka-streaming-example",

"auto.offset.reset" -> "earliest",

"enable.auto.commit" -> (false: java.lang.Boolean)

)

val topics = Array("wordcount")

val messages = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams))

messages.foreachRDD { rdd =>

System.out.println("--- New RDD with " + rdd.partitions.size + " partitions and " + rdd.count + " records")

rdd.foreach { record =>

System.out.print(record.value())

}

}

ssc.start()

Session 2: (Producer)
./kafka-console-producer.sh --broker-list sandbox.hortonworks.com:6667 --topic wordcount (Type in something)

Explorer

Hi Howchoy,

Thank you for working it out. I could replicate your steps in Spark Shell and got it to work. However when I do the same thing in Zeppelin, it does not work.

I need to enhance the code to use some real data and demonstrate it to the client in graphical form. That is why I need it to work in Zeppelin. Is there a way I can submit the job in Spark and somehow consume the results in Zeppelin without writing it to a database ?

Explorer

I am assuming this is a bug in Zeppelin. Its a pity since Zeppelin is an excellent tool for selling purposes. Can someone let me know how to report this bug in the Zeppelin Jira site ?

New Contributor

I met the same issue in Zeppelin when using the same example code as provided by Example Provided

After adding a line ssc.awaitTermination() at the end, there are output like below but obviously no messages were read.

By putting the code into Eclipse Scala IDE, the code can read the messages from Kafka and output results correctly...This makes me think it's an issue of communication between Zeppelin and Kafka.

-------------------------------------------
Time: 1500253145000 ms
-------------------------------------------
-------------------------------------------
Time: 1500253150000 ms
-------------------------------------------

New Contributor

Did someone solve this? Some workaround?

 

I see it is still open as a major bug:

 

https://issues.apache.org/jira/browse/ZEPPELIN-2746

 

I believe I connected succsesfully to the Kafka Topic as Consumer object was made and Zeppelin is in the "running" mode with no errors. I have to disable process and restart Spark interpreter and than enable process again... quite painful...

 

conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@4d4a1536
ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@882f25e
topics: Array[String] = Array(abenesova_topic)
stream: org.apache.spark.streaming.dstream.InputDStream[org.apache.kafka.clients.consumer.ConsumerRecord[String,String]] = org.apache.spark.streaming.kafka010.DirectKafkaInputDStream@7df1173e