Reply
New Contributor
Posts: 1
Registered: ‎09-29-2018

Issue with using spark-streaming: "kafka010.KafkaRDD: Kafka ConsumerRecord is not serializable"

[ Edited ]

I'm using Spark-Streaming-Kafka on CDH(CDH 5.14.0, Parcel), Spark(SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809).

When I'm trying consume kafka messages and write them into HDFS, I got errors like this:

 

ERROR kafka010.KafkaRDD: Kafka ConsumerRecord is not serializable. Use .map to extract fields before calling .persist or .window

 

My Spark Session Code:

def configSparkInfo(appName: String): SparkSession = {
    val conf = new SparkConf()
    // Add serialize config here 
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    val spark = SparkSession
      .builder
      .config(conf)
      .appName(appName)
      .enableHiveSupport()
      .getOrCreate()
    spark
}

 

Consuming Kafka Code:

 

def consumerKafka2Hive(appName: String, kafkaGroup: String, kafkaTopic: String, tableName: String,batchHandleTime:Int,
                         func:(RDD[ConsumerRecord[String,String]],SparkSession) =>DataFrameWriter[Row], hdfsSavePath:String): Unit ={
    
    val spark = configSparkInfo(appName)
    val sparkContex=spark.sparkContext
    sparkContex.setLogLevel("WARN")
    val ssc = new StreamingContext(sparkContex, Seconds(batchHandleTime))  /*batchDuration = 10s*/

    ssc.checkpoint("hdfs:///user/checkpoint/"+kafkaTopic+"/"+kafkaGroup)

    val brokers = CommonConstant.KAFKA_SERVER_ONLINE
    val kafkaParams = kafkaConfigure(kafkaGroup, brokers)

    var fromOffset : Map[TopicPartition,Long] = Map()
    val offsetList = getOffsetByMySQL(kafkaTopic,kafkaGroup)
    if (offsetList != null){
      fromOffset = setOffsets(offsetList)
    } else {
      val sizeOfPartition = getTopicPartitionNum(kafkaTopic)
      var partition = 0
      while (partition < sizeOfPartition){
        fromOffset += (new TopicPartition(kafkaTopic,partition) -> 0)
        partition = partition + 1
      }
      initOffsets( kafkaTopic,kafkaGroup,sizeOfPartition)
    }

    val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      Assign[String, String](fromOffset.keys.toList, kafkaParams,fromOffset)
    ).checkpoint(Seconds(batchHandleTime*5))

    stream.foreachRDD { rdd =>
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      func(rdd,spark).format("parquet").mode(SaveMode.Append).save(hdfsSavePath)
      saveOffset(offsetRanges,kafkaTopic,kafkaGroup)
      stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)

    }
    ssc.start()
    ssc.awaitTermination()

}

 

mvn dependency:

<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>2.3.0.cloudera2</version>
</dependency>


Any suggestions? Thanks.

New Contributor
Posts: 3
Registered: ‎09-30-2018

Re: Issue with using spark-streaming: "kafka010.KafkaRDD: Kafka ConsumerRecord is not serializa

same issue with you and even the same error logs, hope someone could give suggestion for this~

Announcements