Archives of Support Questions (Read Only)

This is an archived board for historical reference. Information and links may no longer be available or relevant
Announcements
This board is archived and read-only for historical reference. To ask a new question, please post a new topic on the appropriate active board.

HOW TO INTEGRATE OR CONSUME DATA FROM KAFKA TO SPARK STREAM

avatar
Rising Star

Can somebody tell me the data comeing from twitter to kafka succesfull now i want to process further in spark and store into hdfs please provide the java code for the same or any link that i can refer to

1 ACCEPTED SOLUTION

avatar

Hi,

You will find an article here : https://community.hortonworks.com/articles/25726/spark-streaming-explained-kafka-to-phoenix.html

Writing to HDFS can be easily achieved with RDD.saveAsTextFile() for example.

HTH

View solution in original post

4 REPLIES 4

avatar

Hi,

You will find an article here : https://community.hortonworks.com/articles/25726/spark-streaming-explained-kafka-to-phoenix.html

Writing to HDFS can be easily achieved with RDD.saveAsTextFile() for example.

HTH

avatar
Super Guru
@sarfarazkhan pathanI think you can't ignore this guy 🙂 Awesome explanation with code examples.

http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/

avatar
New Member

I agree with @Jitendra Yadav. The blog-posts of Michael Noll are excellent reading, especially in the realm of Kafka.

avatar
Guru
@sarfarazkhan pathan

You will need the following in your Maven POM:

	<dependency>
		<groupId>org.apache.spark</groupId>
		<artifactId>spark-streaming-kafka_2.10</artifactId>
		<version>VERSION</version>
	</dependency>

Then create a stream as follows:

Map<String, Integer> kafkaTopics = new HashMap<String, Integer>();
kafkaTopics.put("TopicName", 1);
SparkConf sparkConf = new SparkConf();
	
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(batchSize));	
JavaPairReceiverInputDStream<String, String> kafkaStream = 
KafkaUtils.createStream(jssc, Constants.zkConnString,"spark-streaming-consumer-group", kafkaTopics);
//kafkaStream.print();
JavaPairDStream<String, String> deviceStream = kafkaStream;