Member since
10-28-2016
8
Posts
4
Kudos Received
0
Solutions
07-07-2017
11:21 AM
I am assuming this is a bug in Zeppelin. Its a pity since Zeppelin is an excellent tool for selling purposes. Can someone let me know how to report this bug in the Zeppelin Jira site ?
... View more
07-06-2017
04:41 AM
Hi Howchoy, Thank you for working it out. I could replicate your steps in Spark Shell and got it to work. However when I do the same thing in Zeppelin, it does not work. I need to enhance the code to use some real data and demonstrate it to the client in graphical form. That is why I need it to work in Zeppelin. Is there a way I can submit the job in Spark and somehow consume the results in Zeppelin without writing it to a database ?
... View more
07-03-2017
06:29 AM
Hi Community, I'm trying to setup a simple example of spark streaming and Kafka integration in Zeppelin without success. Any help will be greatly appreciated. I am following the Apache documentation and the example provided Config Details: Ambari managed HDP 2.6 on a 4-node cluster of Spark 2, Kafka, and Hbase Zeppelin Interpreter dependencies for Spark 2: org.apache.spark:spark-streaming-kafka-0-0_2.11:2.1.1, org.apache.kafka:kafka-clients:0.10.1.1, org.apache.kafka:kafka_2.11:0.10.1.1 I have created 8 messages using the Kafka console producer, such that when I execute the console consumer ./kafka-console-consumer.sh --bootstrap-server vrxhdpkfknod.eastus.cloudapp.azure.com:6667 --topic spark-streaming --from-beginning I get 8 messages displayed ^CProcessed a total of 8 messages When I execute the spark 2 code in Zeppelin, %spark2
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming._
sc.setLogLevel("ERROR") // prevent INFO logging from polluting output
val ssc = StreamingContext.getActiveOrCreate(() => new StreamingContext(sc, Seconds(5))) // creating the StreamingContext with 5 seconds interval
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "vrxhdpkfknod.eastus.cloudapp.azure.com:6667",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "kafka-streaming-example",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("spark-streaming")
val messages = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
messages.foreachRDD { rdd =>
System.out.println("--- New RDD with " + rdd.partitions.size + " partitions and " + rdd.count + " records")
rdd.foreach { record =>
System.out.print(record.value())
}
}
ssc.start() I get import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming._
ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@377213ce
kafkaParams: scala.collection.immutable.Map[String,Object] = Map(key.deserializer -> class org.apache.kafka.common.serialization.StringDeserializer, auto.offset.reset -> earliest, group.id -> kafka-streaming-example, bootstrap.servers -> vrxhdpkfknod.eastus.cloudapp.azure.com:6667, enable.auto.commit -> false, value.deserializer -> class org.apache.kafka.common.serialization.StringDeserializer)
topics: Array[String] = Array(spark-streaming)
messages: org.apache.spark.streaming.dstream.InputDStream[org.apache.kafka.clients.consumer.ConsumerRecord[String,String]] = org.apache.spark.streaming.kafka010.DirectKafkaInputDStream@328d6f9a There are no error messages. But there is no display of the RDD loop code in Zeppelin. I expect to see all the 8 messages lying in the Kafka Broker displayed in Zeppelin as follows: --- New RDD with 1 partitions and 8 records
value-1
value-2
value-3
value-4
value-5
value-6
value-7
value-8 What am I doing wrong?
... View more
Labels:
- Labels:
-
Apache Kafka
-
Apache Spark
-
Apache Zeppelin