Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Spark, how to parse inputDStream to case class with nested generic

Spark, how to parse inputDStream to case class with nested generic

New Contributor

Hi, I am working on a spark-streaming application that reads from a kafka queue containing json files and parses these files into case classes for further processing. The case class structure is as follows:

case class KafkaPayload[T](
                           headerInfo: String,
                           data: T
                         )

case class DataSourceA(
			field1: String,
			field2: String
			)
case class DataSourceB(
			fieldA: String,
			fieldB: String
			)

Where T is data from different sources that write to the kafka topic which have different structures/fields

The parsing function is as follows:

def parseRecords[T: ClassTag](incomingStream: DStream[ConsumerRecord[String, String]]): DStream[T] = {

  val returnStream = incomingStream.mapPartitions(records => {

    val mapper = new ObjectMapper()
    mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
    mapper.registerModule(DefaultScalaModule)

    records.flatMap(record => {
      try {
        Some(mapper.readValue(record.value(), classTag[T].runtimeClass))
      }
      catch {
        case e: Exception => None
      }
    })
  }, preservePartitioning = true)

  return returnStream.asInstanceOf[DStream[T]]
}

and the calling code:

val aRecords = KafkaFunctions.parseRecords[KafkaPayload[DataSourceA]](JsonRecords)
val bRecords = KafkaFunctions.parseRecords[KafkaPayload[DataSourceB]](JsonRecords)

But I am running into issues where the data comes back as:

Some(KafkaPayload(test,Map(field1 -> one, field2 -> two)))

I have tried altering the function to take a TypeReference from

com.fasterxml.jackson.core.`type`.TypeReference

but this causes serialization problems,

Is there anyway around this, or a neater solution for this problem?

Thanks,