Created 02-19-2016 07:50 PM
\*I am using java to develop a simple spark streaming program. I want to read messages from kafka. I have a topic "test" in my kafka. I can read the messages from console command. However I can not read it with my program. I do not get any error either. Here is my code .*/ import org.apache.spark.streaming.kafka.*; import java.util.HashMap; import java.util.Map; import org.apache.spark.*; import org.apache.spark.api.java.function.*; import org.apache.spark.streaming.*; import org.apache.spark.streaming.api.java.*; import scala.Tuple2; import org.apache.log4j.Logger; import org.apache.log4j.Level; public class SparkTest { public static void main(String[] args){ Logger.getLogger("org").setLevel(Level.OFF); Logger.getLogger("akka").setLevel(Level.OFF); Map<String, Integer> map = new HashMap<String, Integer>(); map.put("test", new Integer(3)); SparkConf conf = new SparkConf().setAppName("SparkConsumer").setMaster("local[*]"); JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(3000)); JavaPairReceiverInputDStream<String, String> kafkaStream = KafkaUtils.createStream(ssc, "sandbox.hortonworks.com", "default",map); JavaDStream<String> data = kafkaStream.map(new Function<Tuple2<String, String>, String>() { public String call(Tuple2<String, String> message) { return message._2(); } } ); data.print(); ssc.start(); ssc.awaitTermination(); } } Here is what I get when I run the code: ------------------------------------------- Time: 1455910293000 ms ------------------------------------------- ------------------------------------------- Time: 1455910296000 ms ------------------------------------------- ------------------------------------------- Time: 1455910299000 ms -------------------------------------------
Created 02-20-2016 02:54 PM
Just to make sure, when you use a Kafka Consumer on your test topic you see messages coming in?
Below is the code of the KafkaWordCount example, the parameters of the Stream look good. ( I would add the port number just to be sure :2181 but he would complain if he couldn't connect to zookeeper. ( although if its a warning you wouldn't see that since you switched off logging )
I also don't trust your function, I know it might be some scala magic and do exactly what it should but it seems needlessly complicated.
So my suggestions
- Use consumer to test if data comes in
- Turn on logging
- Add zookeeper port
- Replace your map function with the simple .map(_._2) from the Wordcount example
- print those and see what happens.
val Array(zkQuorum, group, topics, numThreads) = args val sparkConf = new SparkConf().setAppName("KafkaWordCount") val ssc = new StreamingContext(sparkConf, Seconds(2)) ssc.checkpoint("checkpoint") val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1L)) .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2) wordCounts.print()
Created 02-20-2016 02:54 PM
Just to make sure, when you use a Kafka Consumer on your test topic you see messages coming in?
Below is the code of the KafkaWordCount example, the parameters of the Stream look good. ( I would add the port number just to be sure :2181 but he would complain if he couldn't connect to zookeeper. ( although if its a warning you wouldn't see that since you switched off logging )
I also don't trust your function, I know it might be some scala magic and do exactly what it should but it seems needlessly complicated.
So my suggestions
- Use consumer to test if data comes in
- Turn on logging
- Add zookeeper port
- Replace your map function with the simple .map(_._2) from the Wordcount example
- print those and see what happens.
val Array(zkQuorum, group, topics, numThreads) = args val sparkConf = new SparkConf().setAppName("KafkaWordCount") val ssc = new StreamingContext(sparkConf, Seconds(2)) ssc.checkpoint("checkpoint") val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1L)) .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2) wordCounts.print()
Created 10-13-2016 10:42 AM
Hello Hoda,
When I run this program I'm getting this error:
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 16/10/13 16:07:48 INFO Remoting: Starting remoting 16/10/13 16:07:49 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.17.81:40334]
Exception in thread "main" java.lang.IncompatibleClassChangeError: class org.apache.spark.streaming.scheduler.StreamingListenerBus
has interface org.apache.spark.scheduler.SparkListener as super class at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:763) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) at java.net.URLClassLoader.access$100(URLClassLoader.java:73) at java.net.URLClassLoader$1.run(URLClassLoader.java:368) at java.net.URLClassLoader$1.run(URLClassLoader.java:362) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:361) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at org.apache.spark.streaming.scheduler.JobScheduler.<init>(JobScheduler.scala:54) at org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:183) at org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:84) at org.apache.spark.streaming.api.java.JavaStreamingContext.<init>
(JavaStreamingContext.scala:138) at SparkTest.main(SparkTest.java:29)
------------------------------------------------------------------------ BUILD FAILURE ------------------------------------------------------------------------ Total time: 4.181s Finished at: Thu Oct 13 16:07:50 IST 2016 Final Memory: 15M/212M ------------------------------------------------------------------------ Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.2.1:exec (default-cli) on project SparkPractise: Command execution failed. Process exited with an error: 1 (Exit value: 1) -> [Help 1] To see the full stack trace of the errors, re-run Maven with the -e switch. Re-run Maven using the -X switch to enable full debug logging. For more information about the errors and possible solutions, please read the following articles: [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException
and SparkTest.java 29th line is
JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(3000));
Please help me with this