Support Questions

Find answers, ask questions, and share your expertise

How to run spark streaming example of java in HDP2.3

avatar
Contributor

Hi,

I am new to spark streaming , I am trying to run wordcount example using java, the streams comes from kafka. I took the example code which was there and built jar with required dependencies. When I am submitting the spark job it does not call the respective class file.

I have used below command to submit spark job on HDP2.3-

spark-submit --class com.santo.spark.demo.san_spark_demo.JavaKafkaWordCount --master yarn-cluster --num-executors 3 --driver-memory 512 --executor-memory 512 --executor-cores 1 /mnt/share/applications/san-spark_demo-0.0.1-SNAPSHOT-distro.jar

My packaged jar contains following jar-

3706-capture.png

My code as below-

package com.santo.spark.demo.san_spark_demo;


import java.util.Arrays;
import java.util.Map;
import java.util.HashMap;
import java.util.regex.Pattern;


import scala.Tuple2;


import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;


public final class JavaKafkaWordCount {
   private static final Pattern SPACE = Pattern.compile(" ");


   private JavaKafkaWordCount() {
   }


   public static void main(String[] args) {
     if (args.length < 4) {
       System.err.println("Usage: JavaKafkaWordCount <zkQuorum> <group> <topics> <numThreads>");
       System.exit(1);
     }


    // StreamingExamples.setStreamingLogLevels();
     SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount");
     // Create the context with 2 seconds batch size
     JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));


     int numThreads = Integer.parseInt(args[3]);
     Map<String, Integer> topicMap = new HashMap();
     String[] topics = args[2].split(",");
     for (String topic: topics) {
       topicMap.put(topic, numThreads);
     }


     JavaPairReceiverInputDStream<String, String> messages =
             KafkaUtils.createStream(jssc, args[0], args[1], topicMap);


     JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
       
       public String call(Tuple2<String, String> tuple2) {
         return tuple2._2();
       }
     });


     JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
       
       public Iterable<String> call(String x) {
         return (Iterable<String>) Arrays.asList(SPACE.split(x)).iterator();
       }
     });


     JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
       new PairFunction<String, String, Integer>() {
         
         public Tuple2<String, Integer> call(String s) {
           return new Tuple2(s, 1);
         }
       }).reduceByKey(new Function2<Integer, Integer, Integer>() {
         
         public Integer call(Integer i1, Integer i2) {
           return i1 + i2;
         }
       });


     wordCounts.print();
     jssc.start();
     jssc.awaitTermination();
   }
 }
1 ACCEPTED SOLUTION

avatar
Contributor

sorry, my bad. I have corrected this. After build again jar contains only mine files, not any other jar or class files. Attached is updated pom file.pom.xml

View solution in original post

17 REPLIES 17

avatar
Contributor

Now this appears for classnotfoundexception:(

[spark@sandbox bin]$ spark-submit --class com.santo.spark.demo.san_spark_demo.JavaKafkaWordCount --master yarn --num-executors 1 --driver-memory 512m --executor-memory 512m --executor-cores 2 /mnt/share/applications/san-spark_demo-0.0.1-SNAPSHOT-distro.jar java.lang.ClassNotFoundException: com.santo.spark.demo.san_spark_demo.JavaKafkaWordCount at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:278) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$runMain(SparkSubmit.scala:634) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Also can you tell me what is the difference in command running with --master yarn and --master yarn-cluster. For your info I am using HDP2.3 sandbox

avatar
Contributor

Hey Emil,

Here is the jar contents-

[root@sandbox ~]# jar -tvf /mnt/share/applications/san-spark_demo-0.0.1-SNAPSHOT-distro.jar 0 Mon Apr 04 16:29:22 UTC 2016 META-INF/ 202 Mon Apr 04 16:29:20 UTC 2016 META-INF/MANIFEST.MF 0 Mon Apr 04 16:29:22 UTC 2016 san-spark_demo-0.0.1-SNAPSHOT/ 0 Mon Apr 04 16:29:22 UTC 2016 san-spark_demo-0.0.1-SNAPSHOT/lib/ 38744 Mon Apr 04 11:38:10 UTC 2016 san-spark_demo-0.0.1-SNAPSHOT/lib/spark-streaming-kafka_2.10-1.1.0.jar 2557388 Thu Mar 31 12:15:08 UTC 2016 san-spark_demo-0.0.1-SNAPSHOT/lib/kafka_2.10-0.8.0.jar 7137903 Wed Dec 16 17:05:44 UTC 2015 san-spark_demo-0.0.1-SNAPSHOT/lib/scala-library-2.10.1.jar 391834 Wed Dec 16 17:05:14 UTC 2015 san-spark_demo-0.0.1-SNAPSHOT/lib/log4j-1.2.15.jar 388864 Wed Dec 16 17:05:18 UTC 2015 san-spark_demo-0.0.1-SNAPSHOT/lib/mail-1.4.jar 62983 Mon Dec 07 13:10:56 UTC 2015 san-spark_demo-0.0.1-SNAPSHOT/lib/activation-1.1.jar 14384128 Thu Mar 31 12:16:14 UTC 2016 san-spark_demo-0.0.1-SNAPSHOT/lib/scala-compiler-2.10.1.jar 3178531 Mon Apr 04 13:15:50 UTC 2016 san-spark_demo-0.0.1-SNAPSHOT/lib/scala-reflect-2.10.1.jar 64009 Mon Dec 07 13:36:42 UTC 2015 san-spark_demo-0.0.1-SNAPSHOT/lib/zkclient-0.3.jar 995968 Mon Dec 07 13:11:18 UTC 2015 san-spark_demo-0.0.1-SNAPSHOT/lib/snappy-java-1.0.4.1.jar 82123 Mon Dec 07 13:36:38 UTC 2015 san-spark_demo-0.0.1-SNAPSHOT/lib/metrics-core-2.2.0.jar 26083 Mon Apr 04 15:38:02 UTC 2016 san-spark_demo-0.0.1-SNAPSHOT/lib/slf4j-api-1.7.2.jar 4229 Thu Mar 31 12:16:14 UTC 2016 san-spark_demo-0.0.1-SNAPSHOT/lib/metrics-annotation-2.2.0.jar 8811 Mon Apr 04 16:29:16 UTC 2016 san-spark_demo-0.0.1-SNAPSHOT/lib/san-spark_demo-0.0.1-SNAPSHOT.jar [root@sandbox ~]#

I am going to modify my pom and execute again , will let you know the outcome.

Thanks you very much for your help and support.

avatar
Contributor

You very correct Emil. After using your POM structure I am able to build and Spark-submit at-least started heating my class. But it failed for Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/streaming/kafka/KafkaUtils as this is not part of or jar which is in class path. What I should change with POM.xml to include that.

16/04/29 07:11:09 INFO YarnClientSchedulerBackend: Add WebUI Filter. org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter, Map(PROXY_HOSTS -> sandbox.hortonworks.com, PROXY_URI_BASES -> http://sandbox.hortonworks.com:8088/proxy/application_1461754120456_0004), /proxy/application_1461754120456_0004
16/04/29 07:11:09 INFO JettyUtils: Adding filter: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
16/04/29 07:11:10 INFO Client: Application report for application_1461754120456_0004 (state: RUNNING)
16/04/29 07:11:10 INFO Client:
         client token: N/A
         diagnostics: N/A
         ApplicationMaster host: 10.0.2.15
         ApplicationMaster RPC port: 0
         queue: default
         start time: 1461913855488
         final status: UNDEFINED
         tracking URL: http://sandbox.hortonworks.com:8088/proxy/application_1461754120456_0004/
         user: spark
16/04/29 07:11:10 INFO YarnClientSchedulerBackend: Application application_1461754120456_0004 has started running.
16/04/29 07:11:10 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 51933.
16/04/29 07:11:10 INFO NettyBlockTransferService: Server created on 51933
16/04/29 07:11:10 INFO BlockManagerMaster: Trying to register BlockManager
16/04/29 07:11:10 INFO BlockManagerMasterEndpoint: Registering block manager 10.0.2.15:51933 with 265.4 MB RAM, BlockManagerId(driver, 10.0.2.15, 51933)
16/04/29 07:11:10 INFO BlockManagerMaster: Registered BlockManager
16/04/29 07:11:11 INFO YarnClientSchedulerBackend: SchedulerBackend is ready for scheduling beginning after waiting maxRegisteredResourcesWaitingTime: 30000(ms)
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/streaming/kafka/KafkaUtils
        at com.santo.spark.demo.san_spark_demo.JavaKafkaWordCount.main(JavaKafkaWordCount.java:47)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaUtils
        at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
        ... 10 more
16/04/29 07:11:23 INFO YarnClientSchedulerBackend: Registered executor: AkkaRpcEndpointRef(Actor[akka.tcp://sparkExecutor@sandbox.hortonworks.com:39447/user/Executor#1317453705]) with ID 1
16/04/29 07:11:24 INFO BlockManagerMasterEndpoint: Registering block manager sandbox.hortonworks.com:46149 with 265.4 MB RAM, BlockManagerId(1, sandbox.hortonworks.com, 46149)


Following is the outcome after running jar cvf with latest pom-

[root@sandbox ~]# jar -tvf /mnt/share/applications/san-spark_demo-0.0.1-SNAPSHOT.jar 0 Fri Apr 29 12:53:06 UTC 2016 META-INF/ 140 Fri Apr 29 12:53:04 UTC 2016 META-INF/MANIFEST.MF 0 Fri Apr 29 12:53:06 UTC 2016 com/ 0 Fri Apr 29 12:53:06 UTC 2016 com/santo/ 0 Fri Apr 29 12:53:06 UTC 2016 com/santo/spark/ 0 Fri Apr 29 12:53:06 UTC 2016 com/santo/spark/demo/ 0 Fri Apr 29 12:53:06 UTC 2016 com/santo/spark/demo/san_spark_demo/ 0 Fri Apr 29 12:52:40 UTC 2016 META-INF/maven/ 0 Fri Apr 29 12:52:40 UTC 2016 META-INF/maven/com.santo.spark.demo/ 0 Fri Apr 29 12:52:40 UTC 2016 META-INF/maven/com.santo.spark.demo/san-spark_demo/ 1287 Fri Apr 29 12:53:06 UTC 2016 com/santo/spark/demo/san_spark_demo/JavaKafkaWordCount$1.class 1438 Fri Apr 29 12:53:06 UTC 2016 com/santo/spark/demo/san_spark_demo/JavaKafkaWordCount$2.class 1180 Fri Apr 29 12:53:06 UTC 2016 com/santo/spark/demo/san_spark_demo/JavaKafkaWordCount$3.class 1265 Fri Apr 29 12:53:06 UTC 2016 com/santo/spark/demo/san_spark_demo/JavaKafkaWordCount$4.class 4197 Fri Apr 29 12:53:06 UTC 2016 com/santo/spark/demo/san_spark_demo/JavaKafkaWordCount.class 274 Fri Apr 29 12:52:40 UTC 2016 META-INF/maven/com.santo.spark.demo/san-spark_demo/pom.properties 3325 Fri Apr 29 12:52:40 UTC 2016 META-INF/maven/com.santo.spark.demo/san-spark_demo/pom.xml 3325 Fri Apr 29 12:42:28 UTC 2016 META-INF/maven/com.santo.spark.demo/san-spark_demo/pom.xml 133 Fri Apr 29 12:53:06 UTC 2016 META-INF/maven/com.santo.spark.demo/san-spark_demo/pom.properties

Could you please check with attached pom and xml and suggest what I am missing to attach kafkautils class pom.xmlpackage-config.xmlas part of jar.

avatar
Contributor

I have added last dependency in both xml files for kafkautils, but that seems wrong, please ignore that and suggest if I need to modify anything in both attached files.

avatar
Contributor

I already followed the same approach and instead of finaljar.xml I renamed the file as package-config.assembly, whcih has same contents as of finaljar.xml. But unable to decide what else I need to add dependent jars there.

avatar
Contributor

sorry, my bad. I have corrected this. After build again jar contains only mine files, not any other jar or class files. Attached is updated pom file.pom.xml

avatar
Contributor

oh.... got it. I am using STS to build and package. But after some modification in pom I am able to get the same whatever you suggested. Below is my update in pom.xml

<build>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-assembly-plugin</artifactId>
				<version>2.4</version>
				<configuration>
					<descriptor>src/main/assembly/package-config.xml</descriptor>	
					<archive>
           				 <manifest>
              				<mainClass>com.santo.spark.demo.san_spark_demo.JavaKafkaWordCount</mainClass>
            			</manifest>
          			</archive>					
				</configuration>
				<executions>
					<execution>
						<id>make-assembly</id>
						<phase>package</phase>
						<goals>
							<goal>single</goal>
						</goals>
					</execution>
				</executions>
			</plugin>
		</plugins>
	</build>

so in above after adding executions tag it make use of assembly plugin. Earlier it was not able to use that. Now me too able to get the jar which is approx 13MB. Using this jar I can see my spark job gets submitted now, and every 2 seconds batch created. I have to still run my kafka and push some text to see count.....

-------------------------------------------
Time: 1461933986000 ms
-------------------------------------------


16/04/29 12:46:26 INFO JobScheduler: Finished job streaming job 1461933986000 ms.0 from job set of time 1461933986000 ms
16/04/29 12:46:26 INFO JobScheduler: Total delay: 0.207 s for time 1461933986000 ms (execution: 0.183 s)
16/04/29 12:46:26 INFO ShuffledRDD: Removing RDD 58 from persistence list
16/04/29 12:46:26 INFO MapPartitionsRDD: Removing RDD 57 from persistence list
16/04/29 12:46:26 INFO MapPartitionsRDD: Removing RDD 56 from persistence list
16/04/29 12:46:26 INFO BlockManager: Removing RDD 58
16/04/29 12:46:26 INFO BlockManager: Removing RDD 57
16/04/29 12:46:26 INFO BlockManager: Removing RDD 56
16/04/29 12:46:26 INFO MapPartitionsRDD: Removing RDD 55 from persistence list
16/04/29 12:46:26 INFO BlockManager: Removing RDD 55
16/04/29 12:46:26 INFO BlockRDD: Removing RDD 54 from persistence list
16/04/29 12:46:26 INFO BlockManager: Removing RDD 54
16/04/29 12:46:26 INFO KafkaInputDStream: Removing blocks of RDD BlockRDD[54] at createStream at JavaKafkaWordCount.java:47 of time 1461933986000 ms
16/04/29 12:46:26 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer(1461933982000 ms)
16/04/29 12:46:26 INFO InputInfoTracker: remove old batch metadata: 1461933982000 ms
16/04/29 12:46:28 INFO JobScheduler: Starting job streaming job 1461933988000 ms.0 from job set of time 1461933988000 ms
16/04/29 12:46:28 INFO SparkContext: Starting job: print at JavaKafkaWordCount.java:76
16/04/29 12:46:28 INFO JobScheduler: Added jobs for time 1461933988000 ms
16/04/29 12:46:28 INFO DAGScheduler: Registering RDD 67 (mapToPair at JavaKafkaWordCount.java:63)
16/04/29 12:46:28 INFO DAGScheduler: Got job 26 (print at JavaKafkaWordCount.java:76) with 1 output partitions (allowLocal=true)
16/04/29 12:46:28 INFO DAGScheduler: Final stage: ResultStage 52(print at JavaKafkaWordCount.java:76)
16/04/29 12:46:28 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 51)
16/04/29 12:46:28 INFO DAGScheduler: Missing parents: List()
16/04/29 12:46:28 INFO DAGScheduler: Submitting ResultStage 52 (ShuffledRDD[68] at reduceByKey at JavaKafkaWordCount.java:69), which has no missing parents
16/04/29 12:46:28 INFO MemoryStore: ensureFreeSpace(2456) called with curMem=138348, maxMem=278302556
16/04/29 12:46:28 INFO MemoryStore: Block broadcast_27 stored as values in memory (estimated size 2.4 KB, free 265.3 MB)
16/04/29 12:46:28 INFO MemoryStore: ensureFreeSpace(1520) called with curMem=140804, maxMem=278302556
16/04/29 12:46:28 INFO MemoryStore: Block broadcast_27_piece0 stored as bytes in memory (estimated size 1520.0 B, free 265.3 MB)
16/04/29 12:46:28 INFO BlockManagerInfo: Added broadcast_27_piece0 in memory on 10.0.2.15:60441 (size: 1520.0 B, free: 265.4 MB)
16/04/29 12:46:28 INFO SparkContext: Created broadcast 27 from broadcast at DAGScheduler.scala:874
16/04/29 12:46:28 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 52 (ShuffledRDD[68] at reduceByKey at JavaKafkaWordCount.java:69)
16/04/29 12:46:28 INFO YarnScheduler: Adding task set 52.0 with 1 tasks
16/04/29 12:46:28 INFO TaskSetManager: Starting task 0.0 in stage 52.0 (TID 98, sandbox.hortonworks.com, PROCESS_LOCAL, 1249 bytes)
16/04/29 12:46:28 INFO BlockManagerInfo: Added broadcast_27_piece0 in memory on sandbox.hortonworks.com:53604 (size: 1520.0 B, free: 265.4 MB)
16/04/29 12:46:28 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 13 to sandbox.hortonworks.com:57213
16/04/29 12:46:28 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 13 is 82 bytes
16/04/29 12:46:28 INFO TaskSetManager: Finished task 0.0 in stage 52.0 (TID 98) in 60 ms on sandbox.hortonworks.com (1/1)
16/04/29 12:46:28 INFO YarnScheduler: Removed TaskSet 52.0, whose tasks have all completed, from pool
16/04/29 12:46:28 INFO DAGScheduler: ResultStage 52 (print at JavaKafkaWordCount.java:76) finished in 0.060 s
16/04/29 12:46:28 INFO DAGScheduler: Job 26 finished: print at JavaKafkaWordCount.java:76, took 0.088915 s
16/04/29 12:46:28 INFO SparkContext: Starting job: print at JavaKafkaWordCount.java:76
16/04/29 12:46:28 INFO DAGScheduler: Got job 27 (print at JavaKafkaWordCount.java:76) with 1 output partitions (allowLocal=true)
16/04/29 12:46:28 INFO DAGScheduler: Final stage: ResultStage 54(print at JavaKafkaWordCount.java:76)
16/04/29 12:46:28 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 53)
16/04/29 12:46:28 INFO DAGScheduler: Missing parents: List()
16/04/29 12:46:28 INFO DAGScheduler: Submitting ResultStage 54 (ShuffledRDD[68] at reduceByKey at JavaKafkaWordCount.java:69), which has no missing parents
16/04/29 12:46:28 INFO MemoryStore: ensureFreeSpace(2456) called with curMem=142324, maxMem=278302556
16/04/29 12:46:28 INFO MemoryStore: Block broadcast_28 stored as values in memory (estimated size 2.4 KB, free 265.3 MB)
16/04/29 12:46:28 INFO MemoryStore: ensureFreeSpace(1520) called with curMem=144780, maxMem=278302556
16/04/29 12:46:28 INFO MemoryStore: Block broadcast_28_piece0 stored as bytes in memory (estimated size 1520.0 B, free 265.3 MB)
16/04/29 12:46:28 INFO BlockManagerInfo: Added broadcast_28_piece0 in memory on 10.0.2.15:60441 (size: 1520.0 B, free: 265.4 MB)
16/04/29 12:46:28 INFO SparkContext: Created broadcast 28 from broadcast at DAGScheduler.scala:874
16/04/29 12:46:28 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 54 (ShuffledRDD[68] at reduceByKey at JavaKafkaWordCount.java:69)
16/04/29 12:46:28 INFO YarnScheduler: Adding task set 54.0 with 1 tasks
16/04/29 12:46:28 INFO TaskSetManager: Starting task 0.0 in stage 54.0 (TID 99, sandbox.hortonworks.com, PROCESS_LOCAL, 1249 bytes)
16/04/29 12:46:28 INFO BlockManagerInfo: Added broadcast_28_piece0 in memory on sandbox.hortonworks.com:53604 (size: 1520.0 B, free: 265.4 MB)
16/04/29 12:46:28 INFO TaskSetManager: Finished task 0.0 in stage 54.0 (TID 99) in 40 ms on sandbox.hortonworks.com (1/1)
16/04/29 12:46:28 INFO YarnScheduler: Removed TaskSet 54.0, whose tasks have all completed, from pool
16/04/29 12:46:28 INFO DAGScheduler: ResultStage 54 (print at JavaKafkaWordCount.java:76) finished in 0.040 s
16/04/29 12:46:28 INFO DAGScheduler: Job 27 finished: print at JavaKafkaWordCount.java:76, took 0.062382 s
-------------------------------------------
Time: 1461933988000 ms
-------------------------------------------


16/04/29 12:46:28 INFO JobScheduler: Finished job streaming job 1461933988000 ms.0 from job set of time 1461933988000 ms
16/04/29 12:46:28 INFO JobScheduler: Total delay: 0.184 s for time 1461933988000 ms (execution: 0.164 s)
16/04/29 12:46:28 INFO ShuffledRDD: Removing RDD 63 from persistence list
16/04/29 12:46:28 INFO BlockManager: Removing RDD 63
16/04/29 12:46:28 INFO MapPartitionsRDD: Removing RDD 62 from persistence list
16/04/29 12:46:28 INFO BlockManager: Removing RDD 62
16/04/29 12:46:28 INFO MapPartitionsRDD: Removing RDD 61 from persistence list
16/04/29 12:46:28 INFO BlockManager: Removing RDD 61
16/04/29 12:46:28 INFO MapPartitionsRDD: Removing RDD 60 from persistence list
16/04/29 12:46:28 INFO BlockManager: Removing RDD 60
16/04/29 12:46:28 INFO BlockRDD: Removing RDD 59 from persistence list
16/04/29 12:46:28 INFO BlockManager: Removing RDD 59
16/04/29 12:46:28 INFO KafkaInputDStream: Removing blocks of RDD BlockRDD[59] at createStream at JavaKafkaWordCount.java:47 of time 1461933988000 ms
16/04/29 12:46:28 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer(1461933984000 ms)
16/04/29 12:46:28 INFO InputInfoTracker: remove old batch metadata: 1461933984000 ms
16/04/29 12:46:30 INFO JobScheduler: Starting job streaming job 1461933990000 ms.0 from job set of time 1461933990000 ms
16/04/29 12:46:30 INFO JobScheduler: Added jobs for time 1461933990000 ms
16/04/29 12:46:30 INFO SparkContext: Starting job: print at JavaKafkaWordCount.java:76
16/04/29 12:46:30 INFO DAGScheduler: Registering RDD 72 (mapToPair at JavaKafkaWordCount.java:63)
16/04/29 12:46:30 INFO DAGScheduler: Got job 28 (print at JavaKafkaWordCount.java:76) with 1 output partitions (allowLocal=true)
16/04/29 12:46:30 INFO DAGScheduler: Final stage: ResultStage 56(print at JavaKafkaWordCount.java:76)
16/04/29 12:46:30 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 55)
16/04/29 12:46:30 INFO DAGScheduler: Missing parents: List()
16/04/29 12:46:30 INFO DAGScheduler: Submitting ResultStage 56 (ShuffledRDD[73] at reduceByKey at JavaKafkaWordCount.java:69), which has no missing parents
16/04/29 12:46:30 INFO MemoryStore: ensureFreeSpace(2456) called with curMem=146300, maxMem=278302556
16/04/29 12:46:30 INFO MemoryStore: Block broadcast_29 stored as values in memory (estimated size 2.4 KB, free 265.3 MB)
16/04/29 12:46:30 INFO MemoryStore: ensureFreeSpace(1521) called with curMem=148756, maxMem=278302556
16/04/29 12:46:30 INFO MemoryStore: Block broadcast_29_piece0 stored as bytes in memory (estimated size 1521.0 B, free 265.3 MB)
16/04/29 12:46:30 INFO BlockManagerInfo: Added broadcast_29_piece0 in memory on 10.0.2.15:60441 (size: 1521.0 B, free: 265.4 MB)
16/04/29 12:46:30 INFO SparkContext: Created broadcast 29 from broadcast at DAGScheduler.scala:874
16/04/29 12:46:30 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 56 (ShuffledRDD[73] at reduceByKey at JavaKafkaWordCount.java:69)
16/04/29 12:46:30 INFO YarnScheduler: Adding task set 56.0 with 1 tasks
16/04/29 12:46:30 INFO TaskSetManager: Starting task 0.0 in stage 56.0 (TID 100, sandbox.hortonworks.com, PROCESS_LOCAL, 1249 bytes)
16/04/29 12:46:30 INFO BlockManagerInfo: Added broadcast_29_piece0 in memory on sandbox.hortonworks.com:53604 (size: 1521.0 B, free: 265.4 MB)
16/04/29 12:46:30 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 14 to sandbox.hortonworks.com:57213
16/04/29 12:46:30 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 14 is 82 bytes
16/04/29 12:46:30 INFO TaskSetManager: Finished task 0.0 in stage 56.0 (TID 100) in 65 ms on sandbox.hortonworks.com (1/1)
16/04/29 12:46:30 INFO YarnScheduler: Removed TaskSet 56.0, whose tasks have all completed, from pool
16/04/29 12:46:30 INFO DAGScheduler: ResultStage 56 (print at JavaKafkaWordCount.java:76) finished in 0.058 s
16/04/29 12:46:30 INFO DAGScheduler: Job 28 finished: print at JavaKafkaWordCount.java:76, took 0.085238 s
16/04/29 12:46:30 INFO SparkContext: Starting job: print at JavaKafkaWordCount.java:76
16/04/29 12:46:30 INFO DAGScheduler: Got job 29 (print at JavaKafkaWordCount.java:76) with 1 output partitions (allowLocal=true)
16/04/29 12:46:30 INFO DAGScheduler: Final stage: ResultStage 58(print at JavaKafkaWordCount.java:76)
16/04/29 12:46:30 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 57)
16/04/29 12:46:30 INFO DAGScheduler: Missing parents: List()
16/04/29 12:46:30 INFO DAGScheduler: Submitting ResultStage 58 (ShuffledRDD[73] at reduceByKey at JavaKafkaWordCount.java:69), which has no missing parents
16/04/29 12:46:30 INFO MemoryStore: ensureFreeSpace(2456) called with curMem=150277, maxMem=278302556
16/04/29 12:46:30 INFO MemoryStore: Block broadcast_30 stored as values in memory (estimated size 2.4 KB, free 265.3 MB)
16/04/29 12:46:30 INFO MemoryStore: ensureFreeSpace(1521) called with curMem=152733, maxMem=278302556
16/04/29 12:46:30 INFO MemoryStore: Block broadcast_30_piece0 stored as bytes in memory (estimated size 1521.0 B, free 265.3 MB)
16/04/29 12:46:30 INFO BlockManagerInfo: Added broadcast_30_piece0 in memory on 10.0.2.15:60441 (size: 1521.0 B, free: 265.4 MB)
16/04/29 12:46:30 INFO SparkContext: Created broadcast 30 from broadcast at DAGScheduler.scala:874
16/04/29 12:46:30 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 58 (ShuffledRDD[73] at reduceByKey at JavaKafkaWordCount.java:69)
16/04/29 12:46:30 INFO YarnScheduler: Adding task set 58.0 with 1 tasks
16/04/29 12:46:30 INFO TaskSetManager: Starting task 0.0 in stage 58.0 (TID 101, sandbox.hortonworks.com, PROCESS_LOCAL, 1249 bytes)
16/04/29 12:46:30 INFO BlockManagerInfo: Added broadcast_30_piece0 in memory on sandbox.hortonworks.com:53604 (size: 1521.0 B, free: 265.4 MB)
16/04/29 12:46:30 INFO TaskSetManager: Finished task 0.0 in stage 58.0 (TID 101) in 44 ms on sandbox.hortonworks.com (1/1)
16/04/29 12:46:30 INFO YarnScheduler: Removed TaskSet 58.0, whose tasks have all completed, from pool
16/04/29 12:46:30 INFO DAGScheduler: ResultStage 58 (print at JavaKafkaWordCount.java:76) finished in 0.027 s
16/04/29 12:46:30 INFO DAGScheduler: Job 29 finished: print at JavaKafkaWordCount.java:76, took 0.071551 s
-------------------------------------------
Time: 1461933990000 ms
-------------------------------------------


16/04/29 12:46:30 INFO JobScheduler: Finished job streaming job 1461933990000 ms.0 from job set of time 1461933990000 ms
16/04/29 12:46:30 INFO JobScheduler: Total delay: 0.195 s for time 1461933990000 ms (execution: 0.173 s)
16/04/29 12:46:30 INFO ShuffledRDD: Removing RDD 68 from persistence list
16/04/29 12:46:30 INFO BlockManager: Removing RDD 68
16/04/29 12:46:30 INFO MapPartitionsRDD: Removing RDD 67 from persistence list
16/04/29 12:46:30 INFO BlockManager: Removing RDD 67
16/04/29 12:46:30 INFO MapPartitionsRDD: Removing RDD 66 from persistence list
16/04/29 12:46:30 INFO BlockManager: Removing RDD 66
16/04/29 12:46:30 INFO MapPartitionsRDD: Removing RDD 65 from persistence list
16/04/29 12:46:30 INFO BlockManager: Removing RDD 65
16/04/29 12:46:30 INFO BlockRDD: Removing RDD 64 from persistence list
16/04/29 12:46:30 INFO BlockManager: Removing RDD 64
16/04/29 12:46:30 INFO KafkaInputDStream: Removing blocks of RDD BlockRDD[64] at createStream at JavaKafkaWordCount.java:47 of time 1461933990000 ms
16/04/29 12:46:30 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer(1461933986000 ms)
16/04/29 12:46:30 INFO InputInfoTracker: remove old batch metadata: 1461933986000 ms
^C16/04/29 12:46:30 INFO YarnExtensionServices: In shutdown hook for org.apache.spark.scheduler.cluster.YarnExtensionServices$$anon$1@ab30c55
16/04/29 12:46:30 INFO YarnHistoryService: Shutting down: pushing out 0 events
16/04/29 12:46:31 INFO YarnHistoryService: Event handler thread stopping the service
16/04/29 12:46:31 INFO YarnHistoryService: Stopping dequeue service, final queue size is 1
16/04/29 12:46:31 WARN YarnHistoryService: Did not finish flushing actionQueue before stopping ATSService, eventQueueBacklog= 1
16/04/29 12:46:31 INFO YarnHistoryService: Stopped: Service History Service in state History Service: STOPPED endpoint=http://sandbox.hortonworks.com:8188/ws/v1/timeline/; bonded to ATS=false; listening=true; batchSize=3; flush count=132; current queue size=1; total number queued=399, processed=396; post failures=0;
16/04/29 12:46:31 INFO StreamingContext: Invoking stop(stopGracefully=false) from shutdown hook
16/04/29 12:46:31 INFO ReceiverTracker: Sent stop signal to all 0 receivers
16/04/29 12:46:31 INFO ReceiverTracker: All of the receivers have deregistered successfully
16/04/29 12:46:31 INFO ReceiverTracker: ReceiverTracker stopped
16/04/29 12:46:31 INFO JobGenerator: Stopping JobGenerator immediately
16/04/29 12:46:31 INFO RecurringTimer: Stopped timer for JobGenerator after time 1461933990000
16/04/29 12:46:31 INFO JobGenerator: Stopped JobGenerator
16/04/29 12:46:31 INFO JobScheduler: Stopped JobScheduler
16/04/29 12:46:31 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/streaming,null}
16/04/29 12:46:31 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/streaming/batch,null}
16/04/29 12:46:31 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/static/streaming,null}


I appreciate your all support Emil, you really a champ and great colleague with perfection.

avatar
Contributor

done:), thanks for your effort.