Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Kafka Spark streaming - createDirectStream

Kafka Spark streaming - createDirectStream

New Contributor

We are trying to land data from Kafka for about 80+ topics using Spark Streaming – as part of this we have the following code that is trying to land data for each topic into its individual folder. Using the below code, we can subscribe to Kafka for the 80+ topics – get the data – create individual folders – land the data as well. The issue we are having is – the data is not being parsed for each individual topic i.e. the entire blob of data received from Kafka (for 88 topics) is being landed into each individual topic folder. We are assuming we are missing a trick with the below code where we are not parsing ‘foreachRDD’ properly so any help would be highly appreciated.

val stream =KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaDirectParams,topicsSet )

var offsetRanges = Array.empty[OffsetRange]

stream.transform{ rdd => offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd }.foreachRDD { rdd => for (o <- offsetRanges) {

loggingPlease(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}",log4jprops)

println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")

val df = rdd.toDF() val cal = Calendar.getInstance()

//val topicName = o.topic //KAFKA_MM_LOAD_AAA_TABLE1

//val savePath = parentFolder +"/"+o.topic.split("")(4)+"/"+o.topic.split("")(5) +"/"+cal.get(Calendar.YEAR).toString + (cal.get(Calendar.MONTH) + 1).toString + cal.get(Calendar.DAY_OF_MONTH).toString

val savePath = parentFolder +"/"+o.topic.split("_")(3)+"AL01742/"+o.topic.split("")(4) +"/DATA".toString

//val savePath = parentFolder +"/" + o.topic+ "/" df.repartition(1).write.mode(SaveMode.Append).avro(savePath) loggingPlease("RDD pulling completed",log4jprops) } }