Spark, how to parse inputDStream to case class with nested generic

Hi, I am working on a spark-streaming application that reads from a kafka queue containing json files and parses these files into case classes for further processing. The case class structure is as follows:

case class KafkaPayload[T](
                           headerInfo: String,
                           data: T

case class DataSourceA(
			field1: String,
			field2: String
case class DataSourceB(
			fieldA: String,
			fieldB: String

Where T is data from different sources that write to the kafka topic which have different structures/fields

The parsing function is as follows:

def parseRecords[T: ClassTag](incomingStream: DStream[ConsumerRecord[String, String]]): DStream[T] = {

  val returnStream = incomingStream.mapPartitions(records => {

    val mapper = new ObjectMapper()
    mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)

    records.flatMap(record => {
      try {
        Some(mapper.readValue(record.value(), classTag[T].runtimeClass))
      catch {
        case e: Exception => None
  }, preservePartitioning = true)

  return returnStream.asInstanceOf[DStream[T]]

and the calling code:

val aRecords = KafkaFunctions.parseRecords[KafkaPayload[DataSourceA]](JsonRecords)
val bRecords = KafkaFunctions.parseRecords[KafkaPayload[DataSourceB]](JsonRecords)

But I am running into issues where the data comes back as:

Some(KafkaPayload(test,Map(field1 -> one, field2 -> two)))

I have tried altering the function to take a TypeReference from


but this causes serialization problems,

Is there anyway around this, or a neater solution for this problem?