Created on 04-26-2016 11:21 AM - edited 08-19-2019 03:50 AM
Hi,
I am new to spark streaming , I am trying to run wordcount example using java, the streams comes from kafka. I took the example code which was there and built jar with required dependencies. When I am submitting the spark job it does not call the respective class file.
I have used below command to submit spark job on HDP2.3-
spark-submit --class com.santo.spark.demo.san_spark_demo.JavaKafkaWordCount --master yarn-cluster --num-executors 3 --driver-memory 512 --executor-memory 512 --executor-cores 1 /mnt/share/applications/san-spark_demo-0.0.1-SNAPSHOT-distro.jar
My packaged jar contains following jar-
My code as below-
package com.santo.spark.demo.san_spark_demo; import java.util.Arrays; import java.util.Map; import java.util.HashMap; import java.util.regex.Pattern; import scala.Tuple2; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka.KafkaUtils; public final class JavaKafkaWordCount { private static final Pattern SPACE = Pattern.compile(" "); private JavaKafkaWordCount() { } public static void main(String[] args) { if (args.length < 4) { System.err.println("Usage: JavaKafkaWordCount <zkQuorum> <group> <topics> <numThreads>"); System.exit(1); } // StreamingExamples.setStreamingLogLevels(); SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount"); // Create the context with 2 seconds batch size JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000)); int numThreads = Integer.parseInt(args[3]); Map<String, Integer> topicMap = new HashMap(); String[] topics = args[2].split(","); for (String topic: topics) { topicMap.put(topic, numThreads); } JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap); JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { public String call(Tuple2<String, String> tuple2) { return tuple2._2(); } }); JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { public Iterable<String> call(String x) { return (Iterable<String>) Arrays.asList(SPACE.split(x)).iterator(); } }); JavaPairDStream<String, Integer> wordCounts = words.mapToPair( new PairFunction<String, String, Integer>() { public Tuple2<String, Integer> call(String s) { return new Tuple2(s, 1); } }).reduceByKey(new Function2<Integer, Integer, Integer>() { public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); wordCounts.print(); jssc.start(); jssc.awaitTermination(); } }
Created 04-29-2016 09:20 AM
sorry, my bad. I have corrected this. After build again jar contains only mine files, not any other jar or class files. Attached is updated pom file.pom.xml
Created 04-27-2016 03:25 AM
Even tried to check the logs of jobhistory and this appears as below-
Appended URL with &User=spark and then found details like below-
Created 04-27-2016 02:52 PM
Nothing appears there. I restarted my sandbox and resubmitted the job but still same case.Yarn log shows same what I have mentioned above.
Thanks
Created on 04-28-2016 06:34 AM - edited 08-19-2019 03:50 AM
Yes, I am using HDP2.3.2, one question do you think my code has any issue? Following is the memory which I have allocated to yarn.
My machine has total of 12 gb RAM installed on windows, where as I have allocated 5792 to HDP at the time of importing. and using yarn config using ambari I have updated node memory to 4608 mb. Please see the attached. I am not sure if there is something wrong with my code or config.
Thanks,
Santosh
Created 04-28-2016 07:03 AM
Thanks Emil again for your quick response. I have a doubt about memory, I was able to run the example code from the examples directory which is in scala, so guessing this could not be a problem of memory, do I need to change any config and put any path as my code in java or or is my code is in-complete.
Created 04-28-2016 07:42 AM
yes, it was with 3 executors. I just replaced class name and jar name at the time submitting the spark job
Created 04-28-2016 08:58 AM
Could you pls suggest how to run on spark shell? I will let you know after running this with --master yarn.
Created 04-28-2016 09:01 AM
[spark@sandbox bin]$ spark-submit --class com.santo.spark.demo.san_spark_demo.JavaKafkaWordCount --master yarn --num-executors 1 --driver-memory 512 --executor-memory 512 --executor-cores 2 /mnt/share/applications/san-spark_demo-0.0.1-SNAPSHOT-distro.jar Error occurred during initialization of VM Too small initial heap [spark@sandbox bin]$
Created 04-28-2016 09:50 AM
still no luck Emil:(
[spark@sandbox bin]$ spark-submit --class com.santo.spark.demo.san_spark_demo.JavaKafkaWordCount --master yarn --num-executors 1 --driver-memory 512 --executor-memory 512 --executor-cores 2 /mnt/share/applications/san-spark_demo-0.0.1-SNAPSHOT-distro.jar Error occurred during initialization of VM Too small initial heap [spark@sandbox bin]$ spark-submit --class com.santo.spark.demo.san_spark_demo.JavaKafkaWordCount --master yarn --num-executors 1 --driver-memory 768 --executor-memory 768 --executor-cores 2 /mnt/share/applications/san-spark_demo-0.0.1-SNAPSHOT-distro.jar Error occurred during initialization of VM Too small initial heap [spark@sandbox bin]$ spark-submit --class com.santo.spark.demo.san_spark_demo.JavaKafkaWordCount --master yarn --num-executors 1 --driver-memory 1024 --executor-memory 1024 --executor-cores 2 /mnt/share/applications/san-spark_demo-0.0.1-SNAPSHOT-distro.jar Error occurred during initialization of VM Too small initial heap [spark@sandbox bin]$ spark-submit --class com.santo.spark.demo.san_spark_demo.JavaKafkaWordCount --master yarn --num-executors 1 --driver-memory 1500 --executor-memory 1500 --executor-cores 2 /mnt/share/applications/san-spark_demo-0.0.1-SNAPSHOT-distro.jar Error occurred during initialization of VM Too small initial heap [spark@sandbox bin]$ spark-submit --class com.santo.spark.demo.san_spark_demo.JavaKafkaWordCount --master yarn --num-executors 1 --driver-memory 2100 --executor-memory 2100 --executor-cores 2 /mnt/share/applications/san-spark_demo-0.0.1-SNAPSHOT-distro.jar Error occurred during initialization of VM Too small initial heap
Created 04-28-2016 11:14 AM
now it says 😞
[spark@sandbox bin]$ spark-submit --class com.santo.spark.demo.san_spark_demo.JavaKafkaWordCount --master yarn --num-executors 1 --driver-memory 512m --executor-memory 512m --executor-cores 2 /mnt/share/applications/san-spark_demo-0.0.1-SNAPSHOT-distro.jar
java.lang.ClassNotFoundException: com.santo.spark.demo.san_spark_demo.JavaKafkaWordCount at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:278) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$runMain(SparkSubmit.scala:634) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)