Created on 09-29-2016 09:14 AM - edited 09-16-2022 03:42 AM
Dear Colleagues,
I've written a SparkStreaming which gets messages from Kafka and put to Hase. The job running fine in local mode but throwa an NullPointerException in yarn-cluster mode.
User class threw exception:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, cloudera01.mlslocal):
java.lang.NullPointerException
at com.mls.hadoop.messages.kafka.consumer.MlsHadoopJobSparkStreamNewKafkaToHBase.saveToHBase(MlsHadoopJobSparkStreamNewKafkaToHBase.java:163)
at com.mls.hadoop.messages.kafka.consumer.MlsHadoopJobSparkStreamNewKafkaToHBase$1$1.call(MlsHadoopJobSparkStreamNewKafkaToHBase.java:140)
at com.mls.hadoop.messages.kafka.consumer.MlsHadoopJobSparkStreamNewKafkaToHBase$1$1.call(MlsHadoopJobSparkStreamNewKafkaToHBase.java:113)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreach$1.apply(JavaRDDLike.scala:332)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreach$1.apply(JavaRDDLike.scala:332)
at scala.collection.Iterator$class.foreach(Iterator.scala:727) at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$32.apply(RDD.scala:912)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$32.apply(RDD.scala:912)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1869)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1869)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745) Driver stacktrace:
The code snippet is the following:
kafkaStream.foreachRDD(new VoidFunction<JavaPairRDD<String, byte[]>>() { public void call(final JavaPairRDD<String, byte[]> rdd) throws Exception { rdd.foreach(new VoidFunction<Tuple2<String, byte[]>>() { public void call(Tuple2<String, byte[]> avroRecord) throws Exception {
Created 10-12-2016 06:21 PM
You may need to check to make sure your rdd is not empty, depending on your processing empty batches within spark streaming can cause some issues.
!rdd.isEmpty
Created 10-12-2016 06:21 PM
You may need to check to make sure your rdd is not empty, depending on your processing empty batches within spark streaming can cause some issues.
!rdd.isEmpty