Created on 04-26-2016 11:21 AM - edited 08-19-2019 03:50 AM
Hi,
I am new to spark streaming , I am trying to run wordcount example using java, the streams comes from kafka. I took the example code which was there and built jar with required dependencies. When I am submitting the spark job it does not call the respective class file.
I have used below command to submit spark job on HDP2.3-
spark-submit --class com.santo.spark.demo.san_spark_demo.JavaKafkaWordCount --master yarn-cluster --num-executors 3 --driver-memory 512 --executor-memory 512 --executor-cores 1 /mnt/share/applications/san-spark_demo-0.0.1-SNAPSHOT-distro.jar
My packaged jar contains following jar-
My code as below-
package com.santo.spark.demo.san_spark_demo;
import java.util.Arrays;
import java.util.Map;
import java.util.HashMap;
import java.util.regex.Pattern;
import scala.Tuple2;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
public final class JavaKafkaWordCount {
private static final Pattern SPACE = Pattern.compile(" ");
private JavaKafkaWordCount() {
}
public static void main(String[] args) {
if (args.length < 4) {
System.err.println("Usage: JavaKafkaWordCount <zkQuorum> <group> <topics> <numThreads>");
System.exit(1);
}
// StreamingExamples.setStreamingLogLevels();
SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount");
// Create the context with 2 seconds batch size
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
int numThreads = Integer.parseInt(args[3]);
Map<String, Integer> topicMap = new HashMap();
String[] topics = args[2].split(",");
for (String topic: topics) {
topicMap.put(topic, numThreads);
}
JavaPairReceiverInputDStream<String, String> messages =
KafkaUtils.createStream(jssc, args[0], args[1], topicMap);
JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
public Iterable<String> call(String x) {
return (Iterable<String>) Arrays.asList(SPACE.split(x)).iterator();
}
});
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) {
return new Tuple2(s, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
wordCounts.print();
jssc.start();
jssc.awaitTermination();
}
}
Created 04-29-2016 09:20 AM
sorry, my bad. I have corrected this. After build again jar contains only mine files, not any other jar or class files. Attached is updated pom file.pom.xml
Created 04-27-2016 03:25 AM
Even tried to check the logs of jobhistory and this appears as below-
Appended URL with &User=spark and then found details like below-
Created 04-27-2016 02:52 PM
Nothing appears there. I restarted my sandbox and resubmitted the job but still same case.Yarn log shows same what I have mentioned above.
Thanks
Created on 04-28-2016 06:34 AM - edited 08-19-2019 03:50 AM
Yes, I am using HDP2.3.2, one question do you think my code has any issue? Following is the memory which I have allocated to yarn.
My machine has total of 12 gb RAM installed on windows, where as I have allocated 5792 to HDP at the time of importing. and using yarn config using ambari I have updated node memory to 4608 mb. Please see the attached. I am not sure if there is something wrong with my code or config.
Thanks,
Santosh
Created 04-28-2016 07:03 AM
Thanks Emil again for your quick response. I have a doubt about memory, I was able to run the example code from the examples directory which is in scala, so guessing this could not be a problem of memory, do I need to change any config and put any path as my code in java or or is my code is in-complete.
Created 04-28-2016 07:42 AM
yes, it was with 3 executors. I just replaced class name and jar name at the time submitting the spark job
Created 04-28-2016 08:58 AM
Could you pls suggest how to run on spark shell? I will let you know after running this with --master yarn.
Created 04-28-2016 09:01 AM
[spark@sandbox bin]$ spark-submit --class com.santo.spark.demo.san_spark_demo.JavaKafkaWordCount --master yarn --num-executors 1 --driver-memory 512 --executor-memory 512 --executor-cores 2 /mnt/share/applications/san-spark_demo-0.0.1-SNAPSHOT-distro.jar Error occurred during initialization of VM Too small initial heap [spark@sandbox bin]$
Created 04-28-2016 09:50 AM
still no luck Emil:(
[spark@sandbox bin]$ spark-submit --class com.santo.spark.demo.san_spark_demo.JavaKafkaWordCount --master yarn --num-executors 1 --driver-memory 512 --executor-memory 512 --executor-cores 2 /mnt/share/applications/san-spark_demo-0.0.1-SNAPSHOT-distro.jar Error occurred during initialization of VM Too small initial heap [spark@sandbox bin]$ spark-submit --class com.santo.spark.demo.san_spark_demo.JavaKafkaWordCount --master yarn --num-executors 1 --driver-memory 768 --executor-memory 768 --executor-cores 2 /mnt/share/applications/san-spark_demo-0.0.1-SNAPSHOT-distro.jar Error occurred during initialization of VM Too small initial heap [spark@sandbox bin]$ spark-submit --class com.santo.spark.demo.san_spark_demo.JavaKafkaWordCount --master yarn --num-executors 1 --driver-memory 1024 --executor-memory 1024 --executor-cores 2 /mnt/share/applications/san-spark_demo-0.0.1-SNAPSHOT-distro.jar Error occurred during initialization of VM Too small initial heap [spark@sandbox bin]$ spark-submit --class com.santo.spark.demo.san_spark_demo.JavaKafkaWordCount --master yarn --num-executors 1 --driver-memory 1500 --executor-memory 1500 --executor-cores 2 /mnt/share/applications/san-spark_demo-0.0.1-SNAPSHOT-distro.jar Error occurred during initialization of VM Too small initial heap [spark@sandbox bin]$ spark-submit --class com.santo.spark.demo.san_spark_demo.JavaKafkaWordCount --master yarn --num-executors 1 --driver-memory 2100 --executor-memory 2100 --executor-cores 2 /mnt/share/applications/san-spark_demo-0.0.1-SNAPSHOT-distro.jar Error occurred during initialization of VM Too small initial heap
Created 04-28-2016 11:14 AM
now it says 😞
[spark@sandbox bin]$ spark-submit --class com.santo.spark.demo.san_spark_demo.JavaKafkaWordCount --master yarn --num-executors 1 --driver-memory 512m --executor-memory 512m --executor-cores 2 /mnt/share/applications/san-spark_demo-0.0.1-SNAPSHOT-distro.jar
java.lang.ClassNotFoundException: com.santo.spark.demo.san_spark_demo.JavaKafkaWordCount at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:278) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$runMain(SparkSubmit.scala:634) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)