Support Questions

Find answers, ask questions, and share your expertise

How to extract the Records processed in a Spark streaming Batch

avatar
Expert Contributor

Hi

I am using NiFi to stream csv files to Spark Streaming. Within Spark I register and override a streaming listeners to get batch (write to file) related information: Spark Streaming Listener. So for each batch I can know the start time, end time, scheduling delay, processing time and number of records etc. What I want is to know is, exactly what files were processed in a batch. so I would want to output the batch info mentioned above with an array of UUIDs for all processed files in that batch (the UUIDs can be the file attribute or if need be can be inside the content of the file aswell). I dont think I can pass the Dtream RDD to the listener. Any suggestions?

Thanks

1 ACCEPTED SOLUTION

avatar
Expert Contributor

Hi

After a bit of search I found that I can write each dstream RDD to specified path using the saveasTextFile method within the foreachRDD action. The problem is that this would write the partitions for the RDD to the location. If you have 3 partitions for the RDD, you will have something like

  1. part-0000
  2. part-0001
  3. part 0002

and this would be overwritten when the next batch starts. meaning if the following batch has 1 partition, the file 0001 and 0002 will be deleted and 0000 will be overwritten with the new data. I have seen that people have written code to merge these files. As I wanted the data for each batch and did not want to loose the data, I specified the path as follows

fileIDs.foreachRDD(rdd =>rdd.saveAsTextFile("/home/arsalan/SparkRDDData/"+ssc.sparkContext.applicationId+"/"+ System.currentTimeMillis() ))

this way it would create a new folder for each batch. Later I can get the data for each batch and dont have to worry about finding ways to avoid overwriting of the files.

View solution in original post

1 REPLY 1

avatar
Expert Contributor

Hi

After a bit of search I found that I can write each dstream RDD to specified path using the saveasTextFile method within the foreachRDD action. The problem is that this would write the partitions for the RDD to the location. If you have 3 partitions for the RDD, you will have something like

  1. part-0000
  2. part-0001
  3. part 0002

and this would be overwritten when the next batch starts. meaning if the following batch has 1 partition, the file 0001 and 0002 will be deleted and 0000 will be overwritten with the new data. I have seen that people have written code to merge these files. As I wanted the data for each batch and did not want to loose the data, I specified the path as follows

fileIDs.foreachRDD(rdd =>rdd.saveAsTextFile("/home/arsalan/SparkRDDData/"+ssc.sparkContext.applicationId+"/"+ System.currentTimeMillis() ))

this way it would create a new folder for each batch. Later I can get the data for each batch and dont have to worry about finding ways to avoid overwriting of the files.