New Contributor
Posts: 1
Registered: ‎11-30-2017

Kafka Spark streaming - createDirectStream

[ Edited ]

We are trying to land data from Kafka for about 80+ topics using Spark Streaming – as part of this we have the following code that is trying to land data for each topic into its individual folder. Using the below code, we can subscribe to Kafka for the 80+ topics – get the data – create individual folders – land the data as well. The issue we are having is – the data is not being parsed for each individual topic i.e. the entire blob of data received from Kafka (for 88 topics) is being landed into each individual topic folder. We are assuming we are missing a trick with the below code where we are not parsing ‘foreachRDD’ properly so any help would be highly appreciated.

val stream =KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaDirectParams,topicsSet )

var offsetRanges = Array.empty[OffsetRange]

stream.transform{ rdd => offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd }.foreachRDD { rdd => for (o <- offsetRanges) {

loggingPlease(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}",log4jprops)

println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")

val df = rdd.toDF() val cal = Calendar.getInstance()

//val topicName = o.topic //KAFKA_MM_LOAD_AAA_TABLE1

//val savePath = parentFolder +"/"+o.topic.split("")(4)+"/"+o.topic.split("")(5) +"/"+cal.get(Calendar.YEAR).toString + (cal.get(Calendar.MONTH) + 1).toString + cal.get(Calendar.DAY_OF_MONTH).toString

val savePath = parentFolder +"/"+o.topic.split("_")(3)+"AL01742/"+o.topic.split("")(4) +"/DATA".toString

//val savePath = parentFolder +"/" + o.topic+ "/" df.repartition(1).write.mode(SaveMode.Append).avro(savePath) loggingPlease("RDD pulling completed",log4jprops) } }