Created 12-24-2016 11:15 PM
I'm facing an issue running spark jon on hadoop/yarn cluster it run's fine in local mode but fails in cluster mode due to this null pointer exception i'm using spark 1.6.2 and scala 2.10.6 both in local and cluster , the application is a streaming application streaming data from kakfa, here is the code where i'm getting the null pointer, i'm able to get data for some of the batches but for some i get the null pointer's as the null pointer's pile up the job fails here is the code snippet of where it fails
I'm new to spark if someone could please point me in the right direction it will be really and helpful
Note :Form the research i have done within map it's iterating over elements of dstream and encountering a null if it next element is null but as per spark documentation empty batches should not cause an issue .
DevMain.scala
Line 1 val lines: DStream[String,Array[Byte]] = myConsumer.createDefaultStream() Line 2 val keyDeLines = lines.map(lme.aParser); --------------------------------------------------------------------------- this is createDefaultStream() def createDefaultStream(): DStream[(String,Array[Byte])] = { val consumerConfProps = List("zookeeper.connect","group.id","zookeeper.connection.timeout.ms") val kafkaConf = Utils.getSubProps(props,consumerConfProps) val topicArray = props.getProperty("topics").split(",") val topicMap = { topicArray.map((_, props.getProperty("numthreads").toInt)).toMap } KafkaUtils.createStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER ) -------------------------------------------------------------------------- this is lme.parser def aParser(x: (String,Array[Byte])): Option[Map[String,Any]] = { logInfo("Entered lme: ") val decodeTry = Injection.invert(x._2) decodeTry match { case Failure(e) => { logInfo(s"Could not decode binary data: " + e.getStackTrace) None } case Success(eventPojo) => { val bs: String = eventPojo.toString logInfo("json: " + bs) } } code never enter's the 'lme.aParser' function in the null pointer cases i have put logging on line1 of lme.parser here is the stacktrace java.lang.NullPointerException at DevMain$anonfun$5.apply(DevMain.scala:2) at scala.collection.Iterator$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$anon$14.hasNext(Iterator.scala:389) at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1631) at org.apache.spark.rdd.RDD$anonfun$count$1.apply(RDD.scala:1157) at org.apache.spark.rdd.RDD$anonfun$count$1.apply(RDD.scala:1157) at org.apache.spark.SparkContext$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.SparkContext$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
Created 12-25-2016 03:50 PM
could you please share your properties file
Created 12-25-2016 05:21 PM
Thanks for the reply Rajkumar could you please let me know which property file ? do u mean yarn-site.xml ? i'm speciafically not using any property file this is how i'm running
spark-submit --verbose --class DevMain --master yarn-cluster --deploy-mode cluster --conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=log4j.properties" --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=log4j.properties" --files "hdfs://hdfs-name-node:8020/user/hadoopuser/log4j.properties" hdfs://hdfs-name-node:8020/user/hadoopuser/streaming_2.10-1.0.0-SNAPSHOT.jar hdfs://hdfs-name-node:8020/user/hadoopuser/enriched.properties
Created 12-25-2016 03:57 PM
Could you edit your question to separate problem description and text from code? You have some text at the end of the code block. It will be easier for contributors to respond. Additionally, please add more of the exception block (especially the CAUSED BY block). How many brokers in the cluster? Thanks.
Created 12-25-2016 05:23 PM
Thanks for pointing out Constantin, i have corrected it ....sure i have added exception block ...running on a 4 node cluster
Created 12-25-2016 05:33 PM
java.lang.NullPointerException at DevMain$anonfun$5.apply(DevMain.scala:2) at (DevMain.scala:2) at scala.collection.Iterator$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$anon$14.hasNext(Iterator.scala:389) at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1631) at org.apache.spark.rdd.RDD$anonfun$count$1.apply(RDD.scala:1157) at org.apache.spark.rdd.RDD$anonfun$count$1.apply(RDD.scala:1157) at org.apache.spark.SparkContext$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.SparkContext$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$failJobAndIndependentStages(DAGScheduler.scala:1431) at org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) at org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) at org.apache.spark.scheduler.DAGScheduler$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) at org.apache.spark.scheduler.DAGScheduler$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) at org.apache.spark.util.EventLoop$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929) at org.apache.spark.rdd.RDD.count(RDD.scala:1157) at DevMain$anonfun$10.apply(DevMain.scala:2) at DevMain$anonfun$10.apply(DevMain.scala:2) at org.apache.spark.streaming.dstream.DStream$anonfun$foreachRDD$1$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) at org.apache.spark.streaming.dstream.DStream$anonfun$foreachRDD$1$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) at org.apache.spark.streaming.dstream.ForEachDStream$anonfun$1$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$anonfun$1$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$anonfun$1$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) at org.apache.spark.streaming.dstream.ForEachDStream$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49) at org.apache.spark.streaming.dstream.ForEachDStream$anonfun$1.apply(ForEachDStream.scala:49) at org.apache.spark.streaming.dstream.ForEachDStream$anonfun$1.apply(ForEachDStream.scala:49) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$anonfun$run$1.apply(JobScheduler.scala:224) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$anonfun$run$1.apply(JobScheduler.scala:224) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.NullPointerException at DevMain$anonfun$5.apply(DevMain.scala:2) at (DevMain.scala:2) at scala.collection.Iterator$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$anon$14.hasNext(Iterator.scala:389) at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1631) at org.apache.spark.rdd.RDD$anonfun$count$1.apply(RDD.scala:1157) at org.apache.spark.rdd.RDD$anonfun$count$1.apply(RDD.scala:1157) at org.apache.spark.SparkContext$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.SparkContext$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
Created 12-26-2016 09:44 PM
As you question states it and how this exception block shows it:
Caused by: java.lang.NullPointerException
at DevMain$anonfun$5.apply(DevMain.scala:2) at (DevMain.scala:2) at scala.collection.Iterator$anon$11.next(Iterator.scala:328)
Hard to guess without debugging the code. My no-brainer advice :): debug your code line by line to detect when you pass a null value where a non-value is expected.