Support Questions

Find answers, ask questions, and share your expertise

How to iterate multiple HDFS files in Spark-Scala using a loop?

avatar
Expert Contributor

Problem:

I want to iterate over multiple HDFS files which has the same schema under one directory. I dont want to load them all together as the data is way too big.

What I Tried:

I tried using shell script for loop but I for each iteration Spark-Submit takes 15-30 seconds to initialize and allocate cluster resources. My script has to run for 900 times for now, so if I can save 15-30 seconds that's a lot as each of the job approximately finishes in 1 min.

I looked all over to find code that would list the HDFS files and I can iterate through them in scala instead of re-submiting the job each time using the shell script.

1 ACCEPTED SOLUTION

avatar
Expert Contributor

Finally find out the solution. Here is the full code below.

Fire up a spark shell, change the 'hadoopPath' below to your own hdfs path which contains several other directories with same schema and see it yourself. It will convert each dataset to dataframe and print the table.

import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.sql.functions.broadcast
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._


case class Test(
  attr1:String,
  attr2:String
)

sc.setLogLevel("WARN")
import  org.apache.hadoop.fs.{FileSystem,Path}
val files = FileSystem.get( sc.hadoopConfiguration ).listStatus(new Path("/hadoopPath"))


def doSomething(file: String) = {

 println (file);

 // your logic of processing a single file comes here

 val x = sc.textFile(file);
 val classMapper = x.map(_.split("\\|"))
          .map(x => refLineID(
            x(0).toString,
            x(1).toString
          )).toDF


  classMapper.show()


}

files.foreach( filename => {
             // the following code makes sure "_SUCCESS" file name is not processed
             val a = filename.getPath.toString()
             val m = a.split("/")
             val name = m(10)
             println("\nFILENAME: " + name)
             if (name == "_SUCCESS") {
               println("Cannot Process '_SUCCSS' Filename")
             } else {
               doSomething(a)
             }

})

View solution in original post

4 REPLIES 4

avatar

@Adnan Alvee

You could use the wholetextfiles() in SparkContext provided by Scala.

Here is a simple outline that will help you avoid the spark-submit for each file and thereby save you the 15-30 seconds per file by iterating over multiple files within the same job.

val data = sc.wholeTextFiles("HDFS_PATH")
val files = data.map { case (filename, content) => filename}
def doSomething(file: String) = { 
 println (file);

 // your logic of processing a single file comes here

 val logData = sc.textFile(file);
 val numAs = logData.filter(line => line.contains("a")).count();
 println("Lines with a: %s".format(numAs));

 // save rdd of single file processed data to hdfs comes here
}

files.collect.foreach( filename => {
    doSomething(filename)
})

where :

  • data - org.apache.spark.rdd.RDD[(String, String)]
  • files - org.apache.spark.rdd.RDD[String]- filenames
  • doSomething(filename) - your requirement/logic
  • HDFS_PATH - hdfs path to your source directory (you could even restrict to import certain kind of files by specifying path as "/hdfspath/*.csv"
  • sc - SparkContext instance

avatar
Expert Contributor

Thanks for your help. It kinda helped. I was getting "ArrayOutOfBound..." error while trying to iterate over, couldn't fix it after debugging. Added my code below. :)

avatar
Expert Contributor

Finally find out the solution. Here is the full code below.

Fire up a spark shell, change the 'hadoopPath' below to your own hdfs path which contains several other directories with same schema and see it yourself. It will convert each dataset to dataframe and print the table.

import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.sql.functions.broadcast
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._


case class Test(
  attr1:String,
  attr2:String
)

sc.setLogLevel("WARN")
import  org.apache.hadoop.fs.{FileSystem,Path}
val files = FileSystem.get( sc.hadoopConfiguration ).listStatus(new Path("/hadoopPath"))


def doSomething(file: String) = {

 println (file);

 // your logic of processing a single file comes here

 val x = sc.textFile(file);
 val classMapper = x.map(_.split("\\|"))
          .map(x => refLineID(
            x(0).toString,
            x(1).toString
          )).toDF


  classMapper.show()


}

files.foreach( filename => {
             // the following code makes sure "_SUCCESS" file name is not processed
             val a = filename.getPath.toString()
             val m = a.split("/")
             val name = m(10)
             println("\nFILENAME: " + name)
             if (name == "_SUCCESS") {
               println("Cannot Process '_SUCCSS' Filename")
             } else {
               doSomething(a)
             }

})

avatar
Expert Contributor

@Joe Widen @Timothy Spann Why did I get a down vote here? My code is working!!! Nothing against you but just want to know if you could figure out the reason! Thanks!