Support Questions

Find answers, ask questions, and share your expertise

Kafka Spark streaming: unable to get any messages

avatar
Expert Contributor
\*I am using java to develop a simple spark streaming program. I want to read messages from kafka. I have a topic "test" in my kafka. I can read the messages from console command. However I can not read it with my program. I do not get any error either. Here is my code .*/

import org.apache.spark.streaming.kafka.*;
import java.util.HashMap;
import java.util.Map;
import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;
import org.apache.log4j.Logger;
import org.apache.log4j.Level;
public class SparkTest {
public static void main(String[] args){
	Logger.getLogger("org").setLevel(Level.OFF);
        Logger.getLogger("akka").setLevel(Level.OFF);
	 Map<String, Integer> map = new HashMap<String, Integer>();
	map.put("test", new Integer(3));
	SparkConf conf = new SparkConf().setAppName("SparkConsumer").setMaster("local[*]");
	JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(3000));
	 JavaPairReceiverInputDStream<String, String> kafkaStream = KafkaUtils.createStream(ssc, "sandbox.hortonworks.com", "default",map);
    JavaDStream<String> data = kafkaStream.map(new Function<Tuple2<String, String>, String>() 

                                             {

          public String call(Tuple2<String, String> message)
                                            {
                                                     return message._2();
                                            } } );
     data.print();
     ssc.start();
     ssc.awaitTermination();
}
}
 
Here is what I get when I run the code:
-------------------------------------------
Time: 1455910293000 ms
-------------------------------------------
-------------------------------------------
Time: 1455910296000 ms
-------------------------------------------
-------------------------------------------
Time: 1455910299000 ms
-------------------------------------------

1 ACCEPTED SOLUTION

avatar
Master Guru

Just to make sure, when you use a Kafka Consumer on your test topic you see messages coming in?

Below is the code of the KafkaWordCount example, the parameters of the Stream look good. ( I would add the port number just to be sure :2181 but he would complain if he couldn't connect to zookeeper. ( although if its a warning you wouldn't see that since you switched off logging )

I also don't trust your function, I know it might be some scala magic and do exactly what it should but it seems needlessly complicated.

So my suggestions

- Use consumer to test if data comes in

- Turn on logging

- Add zookeeper port

- Replace your map function with the simple .map(_._2) from the Wordcount example

- print those and see what happens.

val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName("KafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L))
	.reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)    
wordCounts.print()

View solution in original post

2 REPLIES 2

avatar
Master Guru

Just to make sure, when you use a Kafka Consumer on your test topic you see messages coming in?

Below is the code of the KafkaWordCount example, the parameters of the Stream look good. ( I would add the port number just to be sure :2181 but he would complain if he couldn't connect to zookeeper. ( although if its a warning you wouldn't see that since you switched off logging )

I also don't trust your function, I know it might be some scala magic and do exactly what it should but it seems needlessly complicated.

So my suggestions

- Use consumer to test if data comes in

- Turn on logging

- Add zookeeper port

- Replace your map function with the simple .map(_._2) from the Wordcount example

- print those and see what happens.

val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName("KafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L))
	.reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)    
wordCounts.print()

avatar

Hello Hoda,

When I run this program I'm getting this error:

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 16/10/13 16:07:48 INFO Remoting: Starting remoting 16/10/13 16:07:49 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.17.81:40334]

Exception in thread "main" java.lang.IncompatibleClassChangeError: class org.apache.spark.streaming.scheduler.StreamingListenerBus

has interface org.apache.spark.scheduler.SparkListener as super class at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:763) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) at java.net.URLClassLoader.access$100(URLClassLoader.java:73) at java.net.URLClassLoader$1.run(URLClassLoader.java:368) at java.net.URLClassLoader$1.run(URLClassLoader.java:362) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:361) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at org.apache.spark.streaming.scheduler.JobScheduler.<init>(JobScheduler.scala:54) at org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:183) at org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:84) at org.apache.spark.streaming.api.java.JavaStreamingContext.<init>

(JavaStreamingContext.scala:138) at SparkTest.main(SparkTest.java:29)

------------------------------------------------------------------------ BUILD FAILURE ------------------------------------------------------------------------ Total time: 4.181s Finished at: Thu Oct 13 16:07:50 IST 2016 Final Memory: 15M/212M ------------------------------------------------------------------------ Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.2.1:exec (default-cli) on project SparkPractise: Command execution failed. Process exited with an error: 1 (Exit value: 1) -> [Help 1] To see the full stack trace of the errors, re-run Maven with the -e switch. Re-run Maven using the -X switch to enable full debug logging. For more information about the errors and possible solutions, please read the following articles: [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException

and SparkTest.java 29th line is

JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(3000));

Please help me with this