Reply
New Contributor
Posts: 1
Registered: ‎09-13-2018

Spark Streaming give nullpointer Exception on JSON read (CDH 5.7)

[ Edited ]

I am trying to read data from kafka producer in json format on development server(CDH 5.714). i have 1 zookeeper and 3 brokers.Code is working there and i am getting results.But when i was tried on production(5.14) with 3 zookeeper and 3 bootstrap server i am getting null pointer error.here is mine code:

import _root_.kafka.serializer.DefaultDecoder

import _root_.kafka.serializer.StringDecoder
import org.apache.kudu.spark.kudu._
import org.apache.kudu.client._
import collection.JavaConverters._
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel

import org.apache.spark.streaming._

sc.setLogLevel("ERROR") 
val ssc = new StreamingContext(sc, Seconds(5))
val kafkaTopic = "test2"
val topicsSet = kafkaTopic.split(",").toSet
val kafkaParams = Map[String, String] ("metadata.broker.list" -> "xxx:9092,xxx:9092,xxx:9092","zookeeper.connection.timeout.ms" -> "1000")
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
    val dStream_final=stream.map(_._2) 
   dStream_final.foreachRDD( rdd => {
      if(!rdd.isEmpty){
          dataFrame = sqlContext.read.json(rdd)
          dataFrame.show()
         }
  })
 ssc.start()

 

Here is json object that i am producing:

 

{ "name":"John", "age":30, "car":null }

 This is the exception i am getting when i am running my code:

 

18/09/13 15:26:55 ERROR scheduler.JobScheduler: Error running job streaming job 1536834415000 ms.0
java.lang.NullPointerException
    at org.apache.spark.sql.hive.client.ClientWrapper.conf(ClientWrapper.scala:205)
    at org.apache.spark.sql.hive.HiveContext.hiveconf$lzycompute(HiveContext.scala:554)
    at org.apache.spark.sql.hive.HiveContext.hiveconf(HiveContext.scala:553)
    at org.apache.spark.sql.hive.HiveContext$$anonfun$configure$1.apply(HiveContext.scala:540)
    at org.apache.spark.sql.hive.HiveContext$$anonfun$configure$1.apply(HiveContext.scala:539)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.immutable.List.foreach(List.scala:318)
Announcements