hi
I am trying to write the kafka message to a file.
So I have added output section to the traffic window as below in the traffic.conf
trafficwindow {
dependencies = [traffic]
deriver {
type = sql
query.literal = """
SELECT UNIX_TIMESTAMP() * 1000 as_of_time, ROUND(AVG(number_of_vehicles), 2) avg_num_veh,
MIN(number_of_vehicles) min_num_veh, MAX(number_of_vehicles) max_num_veh,
MIN(measurement_time) first_meas_time, MAX(measurement_time) last_meas_time FROM traffic"""
}
planner = {
type = overwrite
}
output = {
type = filesystem
// The output directory
path = "c:\\temp"
format = parquet
}
}
When i run the traffic example i am getting the error below.
java.io.NotSerializableException: org.apache.kafka.clients.consumer.ConsumerRecord
Serialization stack:
- object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = traffic, partition = 0, offset
2022, CreateTime = 1550606083979, checksum = 3592114955, serialized key size = -1, serialized value size = 16, key = null, value = 1550606083979,23))
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:364)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1021)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:996)
Is this something not supported as of now? Or I am addig output in wrong place.