Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Spark Streaming give nullpointer Exception on JSON read (CDH 5.7)

Highlighted

Spark Streaming give nullpointer Exception on JSON read (CDH 5.7)

New Contributor

I am trying to read data from kafka producer in json format on development server(CDH 5.714). i have 1 zookeeper and 3 brokers.Code is working there and i am getting results.But when i was tried on production(5.14) with 3 zookeeper and 3 bootstrap server i am getting null pointer error.here is mine code:

import _root_.kafka.serializer.DefaultDecoder

import _root_.kafka.serializer.StringDecoder
import org.apache.kudu.spark.kudu._
import org.apache.kudu.client._
import collection.JavaConverters._
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel

import org.apache.spark.streaming._

sc.setLogLevel("ERROR") 
val ssc = new StreamingContext(sc, Seconds(5))
val kafkaTopic = "test2"
val topicsSet = kafkaTopic.split(",").toSet
val kafkaParams = Map[String, String] ("metadata.broker.list" -> "xxx:9092,xxx:9092,xxx:9092","zookeeper.connection.timeout.ms" -> "1000")
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
    val dStream_final=stream.map(_._2) 
   dStream_final.foreachRDD( rdd => {
      if(!rdd.isEmpty){
          dataFrame = sqlContext.read.json(rdd)
          dataFrame.show()
         }
  })
 ssc.start()

 

Here is json object that i am producing:

 

{ "name":"John", "age":30, "car":null }

 This is the exception i am getting when i am running my code:

 

18/09/13 15:26:55 ERROR scheduler.JobScheduler: Error running job streaming job 1536834415000 ms.0
java.lang.NullPointerException
    at org.apache.spark.sql.hive.client.ClientWrapper.conf(ClientWrapper.scala:205)
    at org.apache.spark.sql.hive.HiveContext.hiveconf$lzycompute(HiveContext.scala:554)
    at org.apache.spark.sql.hive.HiveContext.hiveconf(HiveContext.scala:553)
    at org.apache.spark.sql.hive.HiveContext$$anonfun$configure$1.apply(HiveContext.scala:540)
    at org.apache.spark.sql.hive.HiveContext$$anonfun$configure$1.apply(HiveContext.scala:539)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.immutable.List.foreach(List.scala:318)