Support Questions

Find answers, ask questions, and share your expertise

SparkException org.apache.spark.SparkException:Job aborted due to stage failure :java.lang.NullPointerException

avatar
Rising Star

I'm facing an issue running spark jon on hadoop/yarn cluster it run's fine in local mode but fails in cluster mode due to this null pointer exception i'm using spark 1.6.2 and scala 2.10.6 both in local and cluster , the application is a streaming application streaming data from kakfa, here is the code where i'm getting the null pointer, i'm able to get data for some of the batches but for some i get the null pointer's as the null pointer's pile up the job fails here is the code snippet of where it fails

I'm new to spark if someone could please point me in the right direction it will be really and helpful

Note :Form the research i have done within map it's iterating over elements of dstream and encountering a null if it next element is null but as per spark documentation empty batches should not cause an issue .

DevMain.scala   
Line 1 val lines: DStream[String,Array[Byte]] = myConsumer.createDefaultStream()      

Line 2 val keyDeLines = lines.map(lme.aParser);
---------------------------------------------------------------------------
this is createDefaultStream()


    def createDefaultStream(): DStream[(String,Array[Byte])] = {
        val consumerConfProps = List("zookeeper.connect","group.id","zookeeper.connection.timeout.ms")
        val kafkaConf = Utils.getSubProps(props,consumerConfProps)
        val topicArray = props.getProperty("topics").split(",")
        val topicMap = {
          topicArray.map((_, props.getProperty("numthreads").toInt)).toMap
        }
        KafkaUtils.createStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc,
          kafkaConf,
          topicMap,
          StorageLevel.MEMORY_ONLY_SER
        )
--------------------------------------------------------------------------
this is lme.parser


    def aParser(x: (String,Array[Byte])): Option[Map[String,Any]] = {
        logInfo("Entered lme: ")
        val decodeTry = Injection.invert(x._2)
        decodeTry match {
          case Failure(e) => {
            logInfo(s"Could not decode binary data: " + e.getStackTrace)
            None
          }
          case Success(eventPojo) => {
            val bs: String = eventPojo.toString
            logInfo("json: " + bs)
          }
    }
code never enter's the 'lme.aParser' function in the null pointer cases i have put logging on line1 of lme.parser
here is the stacktrace 


    java.lang.NullPointerException 
    at DevMain$anonfun$5.apply(DevMain.scala:2)
    at scala.collection.Iterator$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$anon$14.hasNext(Iterator.scala:389)
    at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1631)
    at org.apache.spark.rdd.RDD$anonfun$count$1.apply(RDD.scala:1157)
    at org.apache.spark.rdd.RDD$anonfun$count$1.apply(RDD.scala:1157)
    at org.apache.spark.SparkContext$anonfun$runJob$5.apply(SparkContext.scala:1858)
    at org.apache.spark.SparkContext$anonfun$runJob$5.apply(SparkContext.scala:1858)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)



6 REPLIES 6

avatar
Super Guru
@Aditya Mamidala

could you please share your properties file

avatar
Rising Star

Thanks for the reply Rajkumar could you please let me know which property file ? do u mean yarn-site.xml ? i'm speciafically not using any property file this is how i'm running

spark-submit --verbose --class DevMain --master yarn-cluster --deploy-mode cluster --conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=log4j.properties" --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=log4j.properties" --files "hdfs://hdfs-name-node:8020/user/hadoopuser/log4j.properties" hdfs://hdfs-name-node:8020/user/hadoopuser/streaming_2.10-1.0.0-SNAPSHOT.jar hdfs://hdfs-name-node:8020/user/hadoopuser/enriched.properties

avatar
Super Guru

@Aditya Mamidala

Could you edit your question to separate problem description and text from code? You have some text at the end of the code block. It will be easier for contributors to respond. Additionally, please add more of the exception block (especially the CAUSED BY block). How many brokers in the cluster? Thanks.

avatar
Rising Star

Thanks for pointing out Constantin, i have corrected it ....sure i have added exception block ...running on a 4 node cluster

avatar
Rising Star
java.lang.NullPointerException
	at DevMain$anonfun$5.apply(DevMain.scala:2)
	at (DevMain.scala:2)
	at scala.collection.Iterator$anon$11.next(Iterator.scala:328)
	at scala.collection.Iterator$anon$11.next(Iterator.scala:328)
	at scala.collection.Iterator$anon$11.next(Iterator.scala:328)
	at scala.collection.Iterator$anon$14.hasNext(Iterator.scala:389)
	at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1631)
	at org.apache.spark.rdd.RDD$anonfun$count$1.apply(RDD.scala:1157)
	at org.apache.spark.rdd.RDD$anonfun$count$1.apply(RDD.scala:1157)
	at org.apache.spark.SparkContext$anonfun$runJob$5.apply(SparkContext.scala:1858)
	at org.apache.spark.SparkContext$anonfun$runJob$5.apply(SparkContext.scala:1858)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
	at org.apache.spark.scheduler.Task.run(Task.scala:89)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$failJobAndIndependentStages(DAGScheduler.scala:1431)
	at org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
	at org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
	at org.apache.spark.scheduler.DAGScheduler$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
	at org.apache.spark.scheduler.DAGScheduler$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
	at scala.Option.foreach(Option.scala:236)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
	at org.apache.spark.util.EventLoop$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
	at org.apache.spark.rdd.RDD.count(RDD.scala:1157)
	at DevMain$anonfun$10.apply(DevMain.scala:2)
	at DevMain$anonfun$10.apply(DevMain.scala:2)
	at org.apache.spark.streaming.dstream.DStream$anonfun$foreachRDD$1$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
	at org.apache.spark.streaming.dstream.DStream$anonfun$foreachRDD$1$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
	at org.apache.spark.streaming.dstream.ForEachDStream$anonfun$1$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
	at org.apache.spark.streaming.dstream.ForEachDStream$anonfun$1$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
	at org.apache.spark.streaming.dstream.ForEachDStream$anonfun$1$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
	at org.apache.spark.streaming.dstream.ForEachDStream$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
	at org.apache.spark.streaming.dstream.ForEachDStream$anonfun$1.apply(ForEachDStream.scala:49)
	at org.apache.spark.streaming.dstream.ForEachDStream$anonfun$1.apply(ForEachDStream.scala:49)
	at scala.util.Try$.apply(Try.scala:161)
	at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$anonfun$run$1.apply(JobScheduler.scala:224)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$anonfun$run$1.apply(JobScheduler.scala:224)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
	at DevMain$anonfun$5.apply(DevMain.scala:2)
	at (DevMain.scala:2)
	at scala.collection.Iterator$anon$11.next(Iterator.scala:328)
	at scala.collection.Iterator$anon$11.next(Iterator.scala:328)
	at scala.collection.Iterator$anon$11.next(Iterator.scala:328)
	at scala.collection.Iterator$anon$14.hasNext(Iterator.scala:389)
	at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1631)
	at org.apache.spark.rdd.RDD$anonfun$count$1.apply(RDD.scala:1157)
	at org.apache.spark.rdd.RDD$anonfun$count$1.apply(RDD.scala:1157)
	at org.apache.spark.SparkContext$anonfun$runJob$5.apply(SparkContext.scala:1858)
	at org.apache.spark.SparkContext$anonfun$runJob$5.apply(SparkContext.scala:1858)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
	at org.apache.spark.scheduler.Task.run(Task.scala:89)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)

avatar
Super Guru

As you question states it and how this exception block shows it:

Caused by: java.lang.NullPointerException

	at DevMain$anonfun$5.apply(DevMain.scala:2)
	at (DevMain.scala:2)
	at scala.collection.Iterator$anon$11.next(Iterator.scala:328)

Hard to guess without debugging the code. My no-brainer advice :): debug your code line by line to detect when you pass a null value where a non-value is expected.