Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

How to iterate multiple HDFS files in Spark-Scala using a loop?

avatar
Expert Contributor

Problem:

I want to iterate over multiple HDFS files which has the same schema under one directory. I dont want to load them all together as the data is way too big.

What I Tried:

I tried using shell script for loop but I for each iteration Spark-Submit takes 15-30 seconds to initialize and allocate cluster resources. My script has to run for 900 times for now, so if I can save 15-30 seconds that's a lot as each of the job approximately finishes in 1 min.

I looked all over to find code that would list the HDFS files and I can iterate through them in scala instead of re-submiting the job each time using the shell script.

1 ACCEPTED SOLUTION

avatar
Expert Contributor
hide-solution

This problem has been solved!

Want to get a detailed solution you have to login/registered on the community

Register/Login
4 REPLIES 4

avatar

@Adnan Alvee

You could use the wholetextfiles() in SparkContext provided by Scala.

Here is a simple outline that will help you avoid the spark-submit for each file and thereby save you the 15-30 seconds per file by iterating over multiple files within the same job.

val data = sc.wholeTextFiles("HDFS_PATH")
val files = data.map { case (filename, content) => filename}
def doSomething(file: String) = { 
 println (file);

 // your logic of processing a single file comes here

 val logData = sc.textFile(file);
 val numAs = logData.filter(line => line.contains("a")).count();
 println("Lines with a: %s".format(numAs));

 // save rdd of single file processed data to hdfs comes here
}

files.collect.foreach( filename => {
    doSomething(filename)
})

where :

  • data - org.apache.spark.rdd.RDD[(String, String)]
  • files - org.apache.spark.rdd.RDD[String]- filenames
  • doSomething(filename) - your requirement/logic
  • HDFS_PATH - hdfs path to your source directory (you could even restrict to import certain kind of files by specifying path as "/hdfspath/*.csv"
  • sc - SparkContext instance

avatar
Expert Contributor

Thanks for your help. It kinda helped. I was getting "ArrayOutOfBound..." error while trying to iterate over, couldn't fix it after debugging. Added my code below. :)

avatar
Expert Contributor
hide-solution

This problem has been solved!

Want to get a detailed solution you have to login/registered on the community

Register/Login

avatar
Expert Contributor

@Joe Widen @Timothy Spann Why did I get a down vote here? My code is working!!! Nothing against you but just want to know if you could figure out the reason! Thanks!