Archives of Support Questions (Read Only)

This is an archived board for historical reference. Information and links may no longer be available or relevant
Announcements
This board is archived and read-only for historical reference. To ask a new question, please post a new topic on the appropriate active board.

Kafka Spark streaming: unable to get any messages

avatar
Expert Contributor
\*I am using java to develop a simple spark streaming program. I want to read messages from kafka. I have a topic "test" in my kafka. I can read the messages from console command. However I can not read it with my program. I do not get any error either. Here is my code .*/

import org.apache.spark.streaming.kafka.*;
import java.util.HashMap;
import java.util.Map;
import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;
import org.apache.log4j.Logger;
import org.apache.log4j.Level;
public class SparkTest {
public static void main(String[] args){
	Logger.getLogger("org").setLevel(Level.OFF);
        Logger.getLogger("akka").setLevel(Level.OFF);
	 Map<String, Integer> map = new HashMap<String, Integer>();
	map.put("test", new Integer(3));
	SparkConf conf = new SparkConf().setAppName("SparkConsumer").setMaster("local[*]");
	JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(3000));
	 JavaPairReceiverInputDStream<String, String> kafkaStream = KafkaUtils.createStream(ssc, "sandbox.hortonworks.com", "default",map);
    JavaDStream<String> data = kafkaStream.map(new Function<Tuple2<String, String>, String>() 

                                             {

          public String call(Tuple2<String, String> message)
                                            {
                                                     return message._2();
                                            } } );
     data.print();
     ssc.start();
     ssc.awaitTermination();
}
}
 
Here is what I get when I run the code:
-------------------------------------------
Time: 1455910293000 ms
-------------------------------------------
-------------------------------------------
Time: 1455910296000 ms
-------------------------------------------
-------------------------------------------
Time: 1455910299000 ms
-------------------------------------------

1 ACCEPTED SOLUTION

avatar
Master Guru

Just to make sure, when you use a Kafka Consumer on your test topic you see messages coming in?

Below is the code of the KafkaWordCount example, the parameters of the Stream look good. ( I would add the port number just to be sure :2181 but he would complain if he couldn't connect to zookeeper. ( although if its a warning you wouldn't see that since you switched off logging )

I also don't trust your function, I know it might be some scala magic and do exactly what it should but it seems needlessly complicated.

So my suggestions

- Use consumer to test if data comes in

- Turn on logging

- Add zookeeper port

- Replace your map function with the simple .map(_._2) from the Wordcount example

- print those and see what happens.

val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName("KafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L))
	.reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)    
wordCounts.print()

View solution in original post

2 REPLIES 2

avatar
Master Guru

Just to make sure, when you use a Kafka Consumer on your test topic you see messages coming in?

Below is the code of the KafkaWordCount example, the parameters of the Stream look good. ( I would add the port number just to be sure :2181 but he would complain if he couldn't connect to zookeeper. ( although if its a warning you wouldn't see that since you switched off logging )

I also don't trust your function, I know it might be some scala magic and do exactly what it should but it seems needlessly complicated.

So my suggestions

- Use consumer to test if data comes in

- Turn on logging

- Add zookeeper port

- Replace your map function with the simple .map(_._2) from the Wordcount example

- print those and see what happens.

val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName("KafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L))
	.reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)    
wordCounts.print()

avatar

Hello Hoda,

When I run this program I'm getting this error:

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 16/10/13 16:07:48 INFO Remoting: Starting remoting 16/10/13 16:07:49 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.17.81:40334]

Exception in thread "main" java.lang.IncompatibleClassChangeError: class org.apache.spark.streaming.scheduler.StreamingListenerBus

has interface org.apache.spark.scheduler.SparkListener as super class at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:763) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) at java.net.URLClassLoader.access$100(URLClassLoader.java:73) at java.net.URLClassLoader$1.run(URLClassLoader.java:368) at java.net.URLClassLoader$1.run(URLClassLoader.java:362) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:361) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at org.apache.spark.streaming.scheduler.JobScheduler.<init>(JobScheduler.scala:54) at org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:183) at org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:84) at org.apache.spark.streaming.api.java.JavaStreamingContext.<init>

(JavaStreamingContext.scala:138) at SparkTest.main(SparkTest.java:29)

------------------------------------------------------------------------ BUILD FAILURE ------------------------------------------------------------------------ Total time: 4.181s Finished at: Thu Oct 13 16:07:50 IST 2016 Final Memory: 15M/212M ------------------------------------------------------------------------ Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.2.1:exec (default-cli) on project SparkPractise: Command execution failed. Process exited with an error: 1 (Exit value: 1) -> [Help 1] To see the full stack trace of the errors, re-run Maven with the -e switch. Re-run Maven using the -X switch to enable full debug logging. For more information about the errors and possible solutions, please read the following articles: [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException

and SparkTest.java 29th line is

JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(3000));

Please help me with this