Support Questions

Find answers, ask questions, and share your expertise

How to run spark streaming example of java in HDP2.3

avatar
Contributor

Hi,

I am new to spark streaming , I am trying to run wordcount example using java, the streams comes from kafka. I took the example code which was there and built jar with required dependencies. When I am submitting the spark job it does not call the respective class file.

I have used below command to submit spark job on HDP2.3-

spark-submit --class com.santo.spark.demo.san_spark_demo.JavaKafkaWordCount --master yarn-cluster --num-executors 3 --driver-memory 512 --executor-memory 512 --executor-cores 1 /mnt/share/applications/san-spark_demo-0.0.1-SNAPSHOT-distro.jar

My packaged jar contains following jar-

3706-capture.png

My code as below-

package com.santo.spark.demo.san_spark_demo;


import java.util.Arrays;
import java.util.Map;
import java.util.HashMap;
import java.util.regex.Pattern;


import scala.Tuple2;


import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;


public final class JavaKafkaWordCount {
   private static final Pattern SPACE = Pattern.compile(" ");


   private JavaKafkaWordCount() {
   }


   public static void main(String[] args) {
     if (args.length < 4) {
       System.err.println("Usage: JavaKafkaWordCount <zkQuorum> <group> <topics> <numThreads>");
       System.exit(1);
     }


    // StreamingExamples.setStreamingLogLevels();
     SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount");
     // Create the context with 2 seconds batch size
     JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));


     int numThreads = Integer.parseInt(args[3]);
     Map<String, Integer> topicMap = new HashMap();
     String[] topics = args[2].split(",");
     for (String topic: topics) {
       topicMap.put(topic, numThreads);
     }


     JavaPairReceiverInputDStream<String, String> messages =
             KafkaUtils.createStream(jssc, args[0], args[1], topicMap);


     JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
       
       public String call(Tuple2<String, String> tuple2) {
         return tuple2._2();
       }
     });


     JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
       
       public Iterable<String> call(String x) {
         return (Iterable<String>) Arrays.asList(SPACE.split(x)).iterator();
       }
     });


     JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
       new PairFunction<String, String, Integer>() {
         
         public Tuple2<String, Integer> call(String s) {
           return new Tuple2(s, 1);
         }
       }).reduceByKey(new Function2<Integer, Integer, Integer>() {
         
         public Integer call(Integer i1, Integer i2) {
           return i1 + i2;
         }
       });


     wordCounts.print();
     jssc.start();
     jssc.awaitTermination();
   }
 }
1 ACCEPTED SOLUTION

avatar
Contributor

sorry, my bad. I have corrected this. After build again jar contains only mine files, not any other jar or class files. Attached is updated pom file.pom.xml

View solution in original post

17 REPLIES 17

avatar
Contributor

Even tried to check the logs of jobhistory and this appears as below-

User [dr.who] is not authorized to view the logs for container_e04_1461658451043_0013_02_000001 in log file [sandbox.hortonworks.com_45454]

No logs available for container container_e04_1461658451043_0013_02_000001

Appended URL with &User=spark and then found details like below-

Logs not available for container_e04_1461658451043_0013_02_000001. Aggregation may not be complete, Check back later or try the nodemanager at sandbox.hortonworks.com:45454

avatar
Contributor

Nothing appears there. I restarted my sandbox and resubmitted the job but still same case.Yarn log shows same what I have mentioned above.

Thanks

avatar
Contributor

Yes, I am using HDP2.3.2, one question do you think my code has any issue? Following is the memory which I have allocated to yarn.

My machine has total of 12 gb RAM installed on windows, where as I have allocated 5792 to HDP at the time of importing. and using yarn config using ambari I have updated node memory to 4608 mb. Please see the attached. I am not sure if there is something wrong with my code or config.

Thanks,

Santosh

3739-yarn.png

avatar
Contributor

Thanks Emil again for your quick response. I have a doubt about memory, I was able to run the example code from the examples directory which is in scala, so guessing this could not be a problem of memory, do I need to change any config and put any path as my code in java or or is my code is in-complete.

avatar
Contributor

yes, it was with 3 executors. I just replaced class name and jar name at the time submitting the spark job

avatar
Contributor

Could you pls suggest how to run on spark shell? I will let you know after running this with --master yarn.

avatar
Contributor

[spark@sandbox bin]$ spark-submit --class com.santo.spark.demo.san_spark_demo.JavaKafkaWordCount --master yarn --num-executors 1 --driver-memory 512 --executor-memory 512 --executor-cores 2 /mnt/share/applications/san-spark_demo-0.0.1-SNAPSHOT-distro.jar Error occurred during initialization of VM Too small initial heap [spark@sandbox bin]$

avatar
Contributor

still no luck Emil:(

[spark@sandbox bin]$ spark-submit --class com.santo.spark.demo.san_spark_demo.JavaKafkaWordCount --master yarn --num-executors 1 --driver-memory 512 --executor-memory 512 --executor-cores 2 /mnt/share/applications/san-spark_demo-0.0.1-SNAPSHOT-distro.jar Error occurred during initialization of VM Too small initial heap [spark@sandbox bin]$ spark-submit --class com.santo.spark.demo.san_spark_demo.JavaKafkaWordCount --master yarn --num-executors 1 --driver-memory 768 --executor-memory 768 --executor-cores 2 /mnt/share/applications/san-spark_demo-0.0.1-SNAPSHOT-distro.jar Error occurred during initialization of VM Too small initial heap [spark@sandbox bin]$ spark-submit --class com.santo.spark.demo.san_spark_demo.JavaKafkaWordCount --master yarn --num-executors 1 --driver-memory 1024 --executor-memory 1024 --executor-cores 2 /mnt/share/applications/san-spark_demo-0.0.1-SNAPSHOT-distro.jar Error occurred during initialization of VM Too small initial heap [spark@sandbox bin]$ spark-submit --class com.santo.spark.demo.san_spark_demo.JavaKafkaWordCount --master yarn --num-executors 1 --driver-memory 1500 --executor-memory 1500 --executor-cores 2 /mnt/share/applications/san-spark_demo-0.0.1-SNAPSHOT-distro.jar Error occurred during initialization of VM Too small initial heap [spark@sandbox bin]$ spark-submit --class com.santo.spark.demo.san_spark_demo.JavaKafkaWordCount --master yarn --num-executors 1 --driver-memory 2100 --executor-memory 2100 --executor-cores 2 /mnt/share/applications/san-spark_demo-0.0.1-SNAPSHOT-distro.jar Error occurred during initialization of VM Too small initial heap

avatar
Contributor

now it says 😞

[spark@sandbox bin]$ spark-submit --class com.santo.spark.demo.san_spark_demo.JavaKafkaWordCount --master yarn --num-executors 1 --driver-memory 512m --executor-memory 512m --executor-cores 2 /mnt/share/applications/san-spark_demo-0.0.1-SNAPSHOT-distro.jar

java.lang.ClassNotFoundException: com.santo.spark.demo.san_spark_demo.JavaKafkaWordCount at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:278) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$runMain(SparkSubmit.scala:634) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)