I am currently developing a spark streaming application in Python to watch a directory and load new files into a Phoenix table. This part is fairly straightforward using Spark-Phoenix integration.
However, for auditing purposes I need to keep track of what files are being loaded, and how many records come from each of these files. The textFileStream does not record the filename in elements of the RDD, and the only way I have found so far to get the filename is by getting the RDD's debug string. While this does give me the filename, it in no way indicates which elements belong to which files. What would be ideal is something like SparkContext's wholeTextFiles(...), which gives back and RDD of the form (filename, contents), while textFileStream acts more like SparkContext's textFiles(...)
Is there any reasonable way to determine which elements belong to which files?
In the hopes that this will help somebody else in the future, I will post what I have discovered. The first important realisation I had was that each file was being loaded into its own partition within the underlying RDD, and that the RDD's debug string was showing the source for each of these partitions. Knowing that, it's fairly straightforward to get the names for each element.
filenames = 
def mapIndex(index, iterator):
values = list(iterator)
yield (filenames[index], values)
debug = rdd.toDebugString()
lines = debug.split("\n")[2:]
for l in lines:
if not rdd.isEmpty():
rdd = rdd.mapPartitionsWithIndex(mapIndex)
if __name__ == "__main__":
#Set up SparkContext and StreamingContext
stream = streamingContext.textFileStream(<your_directory>);
The RDD initially contains the filenames only. After the execution of mapPartitionsWithIndex, it contains pairs of the form <filename, (line0, line1, line2, ..., linen)>. If you want the lines split again, do the following:
rdd = rdd.flatMapValues( lambda x : x )
The RDD now contains pairs of the form <filename, line> for each line in each file.