Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Keeping track of filenames with Spark Streaming

Highlighted

Keeping track of filenames with Spark Streaming

Contributor

I am currently developing a spark streaming application in Python to watch a directory and load new files into a Phoenix table. This part is fairly straightforward using Spark-Phoenix integration.

However, for auditing purposes I need to keep track of what files are being loaded, and how many records come from each of these files. The textFileStream does not record the filename in elements of the RDD, and the only way I have found so far to get the filename is by getting the RDD's debug string. While this does give me the filename, it in no way indicates which elements belong to which files. What would be ideal is something like SparkContext's wholeTextFiles(...), which gives back and RDD of the form (filename, contents), while textFileStream acts more like SparkContext's textFiles(...)

Is there any reasonable way to determine which elements belong to which files?

1 REPLY 1

Re: Keeping track of filenames with Spark Streaming

Contributor

In the hopes that this will help somebody else in the future, I will post what I have discovered. The first important realisation I had was that each file was being loaded into its own partition within the underlying RDD, and that the RDD's debug string was showing the source for each of these partitions. Knowing that, it's fairly straightforward to get the names for each element.

filenames = []

def mapIndex(index, iterator):
    values = list(iterator)
    yield (filenames[index], values)

def f(rdd):
    debug = rdd.toDebugString()
    lines = debug.split("\n")[2:]
    for l in lines:
        filenames.append(l.split()[1].split("/")[-1])
    if not rdd.isEmpty():
        rdd = rdd.mapPartitionsWithIndex(mapIndex)

if __name__ == "__main__":
    #Set up SparkContext and StreamingContext
    stream = streamingContext.textFileStream(<your_directory>);
    stream.foreachRDD(f)
    streamingContext.start()
    streamingContext.awaitTermination()

The RDD initially contains the filenames only. After the execution of mapPartitionsWithIndex, it contains pairs of the form <filename, (line0, line1, line2, ..., linen)>. If you want the lines split again, do the following:

rdd = rdd.flatMapValues( lambda x : x )

The RDD now contains pairs of the form <filename, line> for each line in each file.

I hope this will help at least one other person.

Don't have an account?
Coming from Hortonworks? Activate your account here