Created 01-09-2017 09:44 PM
Problem:
I want to iterate over multiple HDFS files which has the same schema under one directory. I dont want to load them all together as the data is way too big.
What I Tried:
I tried using shell script for loop but I for each iteration Spark-Submit takes 15-30 seconds to initialize and allocate cluster resources. My script has to run for 900 times for now, so if I can save 15-30 seconds that's a lot as each of the job approximately finishes in 1 min.
I looked all over to find code that would list the HDFS files and I can iterate through them in scala instead of re-submiting the job each time using the shell script.
Created 01-10-2017 06:51 PM
Finally find out the solution. Here is the full code below.
Fire up a spark shell, change the 'hadoopPath' below to your own hdfs path which contains several other directories with same schema and see it yourself. It will convert each dataset to dataframe and print the table.
import org.apache.spark.{ SparkConf, SparkContext } import org.apache.spark.sql.functions.broadcast import org.apache.spark.sql.types._ import org.apache.spark.sql._ import org.apache.spark.sql.functions._ val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.implicits._ case class Test( attr1:String, attr2:String ) sc.setLogLevel("WARN") import org.apache.hadoop.fs.{FileSystem,Path} val files = FileSystem.get( sc.hadoopConfiguration ).listStatus(new Path("/hadoopPath")) def doSomething(file: String) = { println (file); // your logic of processing a single file comes here val x = sc.textFile(file); val classMapper = x.map(_.split("\\|")) .map(x => refLineID( x(0).toString, x(1).toString )).toDF classMapper.show() } files.foreach( filename => { // the following code makes sure "_SUCCESS" file name is not processed val a = filename.getPath.toString() val m = a.split("/") val name = m(10) println("\nFILENAME: " + name) if (name == "_SUCCESS") { println("Cannot Process '_SUCCSS' Filename") } else { doSomething(a) } })
Created 01-10-2017 12:21 AM
You could use the wholetextfiles() in SparkContext provided by Scala.
Here is a simple outline that will help you avoid the spark-submit for each file and thereby save you the 15-30 seconds per file by iterating over multiple files within the same job.
val data = sc.wholeTextFiles("HDFS_PATH") val files = data.map { case (filename, content) => filename} def doSomething(file: String) = { println (file); // your logic of processing a single file comes here val logData = sc.textFile(file); val numAs = logData.filter(line => line.contains("a")).count(); println("Lines with a: %s".format(numAs)); // save rdd of single file processed data to hdfs comes here } files.collect.foreach( filename => { doSomething(filename) })
where :
Created 01-10-2017 06:51 PM
Thanks for your help. It kinda helped. I was getting "ArrayOutOfBound..." error while trying to iterate over, couldn't fix it after debugging. Added my code below. :)
Created 01-10-2017 06:51 PM
Finally find out the solution. Here is the full code below.
Fire up a spark shell, change the 'hadoopPath' below to your own hdfs path which contains several other directories with same schema and see it yourself. It will convert each dataset to dataframe and print the table.
import org.apache.spark.{ SparkConf, SparkContext } import org.apache.spark.sql.functions.broadcast import org.apache.spark.sql.types._ import org.apache.spark.sql._ import org.apache.spark.sql.functions._ val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.implicits._ case class Test( attr1:String, attr2:String ) sc.setLogLevel("WARN") import org.apache.hadoop.fs.{FileSystem,Path} val files = FileSystem.get( sc.hadoopConfiguration ).listStatus(new Path("/hadoopPath")) def doSomething(file: String) = { println (file); // your logic of processing a single file comes here val x = sc.textFile(file); val classMapper = x.map(_.split("\\|")) .map(x => refLineID( x(0).toString, x(1).toString )).toDF classMapper.show() } files.foreach( filename => { // the following code makes sure "_SUCCESS" file name is not processed val a = filename.getPath.toString() val m = a.split("/") val name = m(10) println("\nFILENAME: " + name) if (name == "_SUCCESS") { println("Cannot Process '_SUCCSS' Filename") } else { doSomething(a) } })
Created 02-17-2017 06:48 PM
@Joe Widen @Timothy Spann Why did I get a down vote here? My code is working!!! Nothing against you but just want to know if you could figure out the reason! Thanks!