<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>question Kafka Spark streaming: unable to get any messages in Archives of Support Questions (Read Only)</title>
    <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Kafka-Spark-streaming-unable-to-get-any-messages/m-p/150448#M20373</link>
    <description>&lt;PRE&gt;\*I am using java to develop a simple spark streaming program. I want to read messages from kafka. I have a topic "test" in my kafka. I can read the messages from console command. However I can not read it with my program. I do not get any error either. Here is my code .*/

import org.apache.spark.streaming.kafka.*;
import java.util.HashMap;
import java.util.Map;
import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;
import org.apache.log4j.Logger;
import org.apache.log4j.Level;
public class SparkTest {
public static void main(String[] args){
	Logger.getLogger("org").setLevel(Level.OFF);
        Logger.getLogger("akka").setLevel(Level.OFF);
	 Map&amp;lt;String, Integer&amp;gt; map = new HashMap&amp;lt;String, Integer&amp;gt;();
	map.put("test", new Integer(3));
	SparkConf conf = new SparkConf().setAppName("SparkConsumer").setMaster("local[*]");
	JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(3000));
	 JavaPairReceiverInputDStream&amp;lt;String, String&amp;gt; kafkaStream = KafkaUtils.createStream(ssc, "sandbox.hortonworks.com", "default",map);
    JavaDStream&amp;lt;String&amp;gt; data = kafkaStream.map(new Function&amp;lt;Tuple2&amp;lt;String, String&amp;gt;, String&amp;gt;() 

                                             {

          public String call(Tuple2&amp;lt;String, String&amp;gt; message)
                                            {
                                                     return message._2();
                                            } } );
     data.print();
     ssc.start();
     ssc.awaitTermination();
}
}
 
Here is what I get when I run the code:
-------------------------------------------
Time: 1455910293000 ms
-------------------------------------------
-------------------------------------------
Time: 1455910296000 ms
-------------------------------------------
-------------------------------------------
Time: 1455910299000 ms
-------------------------------------------

&lt;/PRE&gt;</description>
    <pubDate>Sat, 20 Feb 2016 03:50:03 GMT</pubDate>
    <dc:creator>hoda_moradi2014</dc:creator>
    <dc:date>2016-02-20T03:50:03Z</dc:date>
    <item>
      <title>Kafka Spark streaming: unable to get any messages</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Kafka-Spark-streaming-unable-to-get-any-messages/m-p/150448#M20373</link>
      <description>&lt;PRE&gt;\*I am using java to develop a simple spark streaming program. I want to read messages from kafka. I have a topic "test" in my kafka. I can read the messages from console command. However I can not read it with my program. I do not get any error either. Here is my code .*/

import org.apache.spark.streaming.kafka.*;
import java.util.HashMap;
import java.util.Map;
import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;
import org.apache.log4j.Logger;
import org.apache.log4j.Level;
public class SparkTest {
public static void main(String[] args){
	Logger.getLogger("org").setLevel(Level.OFF);
        Logger.getLogger("akka").setLevel(Level.OFF);
	 Map&amp;lt;String, Integer&amp;gt; map = new HashMap&amp;lt;String, Integer&amp;gt;();
	map.put("test", new Integer(3));
	SparkConf conf = new SparkConf().setAppName("SparkConsumer").setMaster("local[*]");
	JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(3000));
	 JavaPairReceiverInputDStream&amp;lt;String, String&amp;gt; kafkaStream = KafkaUtils.createStream(ssc, "sandbox.hortonworks.com", "default",map);
    JavaDStream&amp;lt;String&amp;gt; data = kafkaStream.map(new Function&amp;lt;Tuple2&amp;lt;String, String&amp;gt;, String&amp;gt;() 

                                             {

          public String call(Tuple2&amp;lt;String, String&amp;gt; message)
                                            {
                                                     return message._2();
                                            } } );
     data.print();
     ssc.start();
     ssc.awaitTermination();
}
}
 
Here is what I get when I run the code:
-------------------------------------------
Time: 1455910293000 ms
-------------------------------------------
-------------------------------------------
Time: 1455910296000 ms
-------------------------------------------
-------------------------------------------
Time: 1455910299000 ms
-------------------------------------------

&lt;/PRE&gt;</description>
      <pubDate>Sat, 20 Feb 2016 03:50:03 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Kafka-Spark-streaming-unable-to-get-any-messages/m-p/150448#M20373</guid>
      <dc:creator>hoda_moradi2014</dc:creator>
      <dc:date>2016-02-20T03:50:03Z</dc:date>
    </item>
    <item>
      <title>Re: Kafka Spark streaming: unable to get any messages</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Kafka-Spark-streaming-unable-to-get-any-messages/m-p/150449#M20374</link>
      <description>&lt;P&gt;Just to make sure, when you use a Kafka Consumer on your test topic you see messages coming in? &lt;/P&gt;&lt;P&gt;Below is the code of the KafkaWordCount example, the parameters of the Stream look good. ( I would add the port number just to be sure :2181 but he would complain if he couldn't connect to zookeeper. ( although if its a warning you wouldn't see that since you switched off logging ) &lt;/P&gt;&lt;P&gt;I also don't trust your function, I know it might be some scala magic and do exactly what it should but it seems needlessly complicated. &lt;/P&gt;&lt;P&gt;So my suggestions&lt;/P&gt;&lt;P&gt;- Use consumer to test if data comes in&lt;/P&gt;&lt;P&gt;- Turn on logging&lt;/P&gt;&lt;P&gt;- Add zookeeper port&lt;/P&gt;&lt;P&gt;- Replace your map function with the simple .map(_._2) from the Wordcount example&lt;/P&gt;&lt;P&gt;- print those and see what happens.&lt;/P&gt;&lt;PRE&gt;val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName("KafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x =&amp;gt; (x, 1L))
	.reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)    
wordCounts.print()&lt;/PRE&gt;</description>
      <pubDate>Sat, 20 Feb 2016 22:54:21 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Kafka-Spark-streaming-unable-to-get-any-messages/m-p/150449#M20374</guid>
      <dc:creator>bleonhardi</dc:creator>
      <dc:date>2016-02-20T22:54:21Z</dc:date>
    </item>
    <item>
      <title>Re: Kafka Spark streaming: unable to get any messages</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Kafka-Spark-streaming-unable-to-get-any-messages/m-p/150450#M20375</link>
      <description>&lt;P&gt;Hello Hoda,&lt;/P&gt;&lt;P&gt;                    When I run this program I'm getting this error:&lt;/P&gt;&lt;P&gt;Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
16/10/13 16:07:48 INFO Remoting: Starting remoting
16/10/13 16:07:49 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.17.81:40334]&lt;/P&gt;&lt;P&gt;
&lt;STRONG&gt;Exception in thread "main" java.lang.IncompatibleClassChangeError: class org.apache.spark.streaming.scheduler.StreamingListenerBus&lt;/STRONG&gt; &lt;/P&gt;&lt;P&gt;has interface org.apache.spark.scheduler.SparkListener as super class
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.spark.streaming.scheduler.JobScheduler.&amp;lt;init&amp;gt;(JobScheduler.scala:54)
at org.apache.spark.streaming.StreamingContext.&amp;lt;init&amp;gt;(StreamingContext.scala:183)
at org.apache.spark.streaming.StreamingContext.&amp;lt;init&amp;gt;(StreamingContext.scala:84)
at org.apache.spark.streaming.api.java.JavaStreamingContext.&amp;lt;init&amp;gt;&lt;/P&gt;&lt;P&gt;&lt;STRONG&gt;(JavaStreamingContext.scala:138)
at SparkTest.main(SparkTest.java:29)&lt;/STRONG&gt; &lt;/P&gt;&lt;P&gt;------------------------------------------------------------------------
BUILD FAILURE
------------------------------------------------------------------------
Total time: 4.181s
Finished at: Thu Oct 13 16:07:50 IST 2016
Final Memory: 15M/212M
------------------------------------------------------------------------
Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.2.1:exec (default-cli) on project SparkPractise: Command execution failed. Process exited with an error: 1 (Exit value: 1) -&amp;gt; [Help 1]
To see the full stack trace of the errors, re-run Maven with the -e switch.
Re-run Maven using the -X switch to enable full debug logging.
For more information about the errors and possible solutions, please read the following articles:
[Help 1] &lt;A href="http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException"&gt;http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException&lt;/A&gt;&lt;/P&gt;&lt;P&gt;&lt;STRONG&gt;
&lt;/STRONG&gt;&lt;/P&gt;&lt;P&gt;&lt;STRONG&gt;and SparkTest.java 29th line is&lt;/STRONG&gt;&lt;/P&gt;&lt;P&gt;&lt;STRONG&gt;
&lt;/STRONG&gt;&lt;/P&gt;&lt;P&gt;&lt;STRONG&gt;JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(3000));&lt;/STRONG&gt;&lt;/P&gt;&lt;P&gt;&lt;STRONG&gt;
&lt;/STRONG&gt;&lt;/P&gt;&lt;P&gt;&lt;STRONG&gt;Please help me with this&lt;/STRONG&gt;&lt;/P&gt;</description>
      <pubDate>Thu, 13 Oct 2016 17:42:27 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Kafka-Spark-streaming-unable-to-get-any-messages/m-p/150450#M20375</guid>
      <dc:creator>santhosh_kumari</dc:creator>
      <dc:date>2016-10-13T17:42:27Z</dc:date>
    </item>
  </channel>
</rss>

