Reply
New Contributor
Posts: 12
Registered: ‎02-06-2019

Spark Exception: ConsumerRecord Not Serializable

[ Edited ]

hi 

I am trying to write the kafka message to a file. 

 

So I have added  output section to the traffic window as below in the traffic.conf

    trafficwindow {
        dependencies = [traffic]
        deriver {
            type = sql
            query.literal = """
                SELECT UNIX_TIMESTAMP() * 1000 as_of_time, ROUND(AVG(number_of_vehicles), 2) avg_num_veh,
                MIN(number_of_vehicles) min_num_veh, MAX(number_of_vehicles) max_num_veh,
                MIN(measurement_time) first_meas_time, MAX(measurement_time) last_meas_time FROM traffic"""
        }
		planner = {
		  type = overwrite
		}
		output = {
		  type = filesystem
		  // The output directory
		  path = "c:\\temp"
		  format = parquet
		}
    }

 

When i run the traffic example i am getting the error below.  

 

java.io.NotSerializableException: org.apache.kafka.clients.consumer.ConsumerRecord
Serialization stack:
- object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = traffic, partition = 0, offset
2022, CreateTime = 1550606083979, checksum = 3592114955, serialized key size = -1, serialized value size = 16, key = null, value = 1550606083979,23))
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:364)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1021)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:996)

 

 

Is this something not supported as of now? Or I am addig output in wrong place.

 

 

Highlighted
Cloudera Employee
Posts: 50
Registered: ‎08-26-2015

Re: Spark Exception: ConsumerRecord Not Serializable

Hi Shrini,

 

Apologies, I had tried to reply to your last post on the other thread but it seems like it never actually posted. My response was:

 

I am not sure why you are getting that error message. It may be difficult for me to be able to assist on this one if you are not running Envelope on a Cloudera cluster, or if you are making modifications to the Envelope code, because it won't be practical for me to replicate the error.

 
Are you intending to use Envelope on a Cloudera cluster? If so then you might want to start from there now rather than trying to run Envelope on Windows with your own Spark and Kafka versions.
 
Jeremy