Created 07-03-2017 06:29 AM
Hi Community,
I'm trying to setup a simple example of spark streaming and Kafka integration in Zeppelin without success. Any help will be greatly appreciated.
I am following the Apache documentation and the example provided
Config Details: Ambari managed HDP 2.6 on a 4-node cluster of Spark 2, Kafka, and Hbase
Zeppelin Interpreter dependencies for Spark 2: org.apache.spark:spark-streaming-kafka-0-0_2.11:2.1.1, org.apache.kafka:kafka-clients:0.10.1.1, org.apache.kafka:kafka_2.11:0.10.1.1
I have created 8 messages using the Kafka console producer, such that when I execute the console consumer
./kafka-console-consumer.sh --bootstrap-server vrxhdpkfknod.eastus.cloudapp.azure.com:6667 --topic spark-streaming --from-beginning
I get 8 messages displayed
^CProcessed a total of 8 messages
When I execute the spark 2 code in Zeppelin,
%spark2 import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe import org.apache.spark.streaming._ sc.setLogLevel("ERROR") // prevent INFO logging from polluting output val ssc = StreamingContext.getActiveOrCreate(() => new StreamingContext(sc, Seconds(5))) // creating the StreamingContext with 5 seconds interval val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "vrxhdpkfknod.eastus.cloudapp.azure.com:6667", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "kafka-streaming-example", "auto.offset.reset" -> "earliest", "enable.auto.commit" -> (false: java.lang.Boolean) ) val topics = Array("spark-streaming") val messages = KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams) ) messages.foreachRDD { rdd => System.out.println("--- New RDD with " + rdd.partitions.size + " partitions and " + rdd.count + " records") rdd.foreach { record => System.out.print(record.value()) } } ssc.start()
I get
import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe import org.apache.spark.streaming._ ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@377213ce kafkaParams: scala.collection.immutable.Map[String,Object] = Map(key.deserializer -> class org.apache.kafka.common.serialization.StringDeserializer, auto.offset.reset -> earliest, group.id -> kafka-streaming-example, bootstrap.servers -> vrxhdpkfknod.eastus.cloudapp.azure.com:6667, enable.auto.commit -> false, value.deserializer -> class org.apache.kafka.common.serialization.StringDeserializer) topics: Array[String] = Array(spark-streaming) messages: org.apache.spark.streaming.dstream.InputDStream[org.apache.kafka.clients.consumer.ConsumerRecord[String,String]] = org.apache.spark.streaming.kafka010.DirectKafkaInputDStream@328d6f9a
There are no error messages. But there is no display of the RDD loop code in Zeppelin. I expect to see all the 8 messages lying in the Kafka Broker displayed in Zeppelin as follows:
--- New RDD with 1 partitions and 8 records value-1 value-2 value-3 value-4 value-5 value-6 value-7 value-8
What am I doing wrong?
Created 07-05-2017 08:43 AM
I am aware of you want the answer in Zeppelin but I got it working in spark-shell. Perhaps you can try to convert the steps over.
Preparation |
|
Session 1 (spark-shell: Consumer) |
./spark-shell --jars
/root/spark/spark-streaming-kafka-0-10_2.11-2.1.0.jar,/usr/hdp/2.6.0.3-8/oozie/share/lib/spark/kafka_2.10-0.10.1.2.6.0.3-8.jar,/usr/hdp/2.6.0.3-8/oozie/share/lib/spark/kafka-clients-0.10.1.2.6.0.3-8.jar
import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe import org.apache.spark.streaming._ sc.setLogLevel("ERROR") // prevent INFO logging from polluting output val ssc = StreamingContext.getActiveOrCreate(() => new StreamingContext(sc, Seconds(5))) // creating the StreamingContext with 5 seconds interval val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "sandbox.hortonworks.com:6667", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "kafka-streaming-example", "auto.offset.reset" -> "earliest", "enable.auto.commit" -> (false: java.lang.Boolean) ) val topics = Array("wordcount") val messages = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams)) messages.foreachRDD { rdd => System.out.println("--- New RDD with " + rdd.partitions.size + " partitions and " + rdd.count + " records") rdd.foreach { record => System.out.print(record.value()) } } ssc.start() |
Session 2: (Producer) |
./kafka-console-producer.sh --broker-list sandbox.hortonworks.com:6667 --topic wordcount (Type in something) |
Created 07-06-2017 04:41 AM
Hi Howchoy,
Thank you for working it out. I could replicate your steps in Spark Shell and got it to work. However when I do the same thing in Zeppelin, it does not work.
I need to enhance the code to use some real data and demonstrate it to the client in graphical form. That is why I need it to work in Zeppelin. Is there a way I can submit the job in Spark and somehow consume the results in Zeppelin without writing it to a database ?
Created 07-07-2017 11:21 AM
I am assuming this is a bug in Zeppelin. Its a pity since Zeppelin is an excellent tool for selling purposes. Can someone let me know how to report this bug in the Zeppelin Jira site ?
Created 07-17-2017 01:33 AM
I met the same issue in Zeppelin when using the same example code as provided by Example Provided
After adding a line ssc.awaitTermination() at the end, there are output like below but obviously no messages were read.
By putting the code into Eclipse Scala IDE, the code can read the messages from Kafka and output results correctly...This makes me think it's an issue of communication between Zeppelin and Kafka.
------------------------------------------- Time: 1500253145000 ms ------------------------------------------- ------------------------------------------- Time: 1500253150000 ms -------------------------------------------
Created 10-11-2019 07:26 AM
Did someone solve this? Some workaround?
I see it is still open as a major bug:
https://issues.apache.org/jira/browse/ZEPPELIN-2746
I believe I connected succsesfully to the Kafka Topic as Consumer object was made and Zeppelin is in the "running" mode with no errors. I have to disable process and restart Spark interpreter and than enable process again... quite painful...
conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@4d4a1536
ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@882f25e
topics: Array[String] = Array(abenesova_topic)
stream: org.apache.spark.streaming.dstream.InputDStream[org.apache.kafka.clients.consumer.ConsumerRecord[String,String]] = org.apache.spark.streaming.kafka010.DirectKafkaInputDStream@7df1173e