Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

How can I read all files in a directory using scala

SOLVED Go to solution
Highlighted

How can I read all files in a directory using scala

Rising Star

I have 1 CSV (comma separated) and 1 PSV ( pipe separated ) files in the same dir /data/dev/spark

How can I read each file and convert them to their own dataframe using scala.

1 ACCEPTED SOLUTION

Accepted Solutions

Re: How can I read all files in a directory using scala

Expert Contributor

Hi @Dinesh Das the following code is tested on spark-shell with scala and works perfectly with psv and csv data.

the following are the datasets I used from the same directory

/data/dev/spark

file1.csv

1,2,3 
x,y,z
a,b,c

file2.psv

q|w|e
1|2|3

To test, you can copy paste my code into spark shell (copy only few lines/functions at a time, do not paste all code at once in Spark Shell)

    import org.apache.spark.{ SparkConf, SparkContext }
    import org.apache.spark.sql.functions.broadcast
    import org.apache.spark.sql.types._
    import org.apache.spark.sql._
    import org.apache.spark.sql.functions._

    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    import sqlContext.implicits._

    // --EDIT YOUR SCHEMA HERE
    case class refLineID(
      attr1:String,
      attr2:String,
      attr3:String
    )

    import  org.apache.hadoop.fs.{FileSystem,Path}


    val files = FileSystem.get( sc.hadoopConfiguration ).listStatus(new Path("/data/dev/spark"))

    // function to check delimiter of each file
    def checkDelim(file:String): String ={
      val x = sc.textFile(file);
      val grab_x = x.take(1) // grab the first row to check delimiter
      val str = grab_x.mkString("")
      val pipe = "\\|"
      val comma = "\\,"
      var delim = ""
      for (c <- str) {
        if (c == ',') {
          delim = comma
        } else if (c == '|') {
          delim = pipe
        }
      }
      return delim
    }

    // -- Function to convert RDD to dataframe after checking delimiter
    def convertToDF(file: String) = {

     var delim = ""
     delim = checkDelim(file) // grab the delimiter by calling function

     val x = sc.textFile(file);
     // pass the file and delimiter type to transform to dataframe
     val x_df = x.map(_.split(delim))
                 .map(a => refLineID(
                    a(0).toString,
                    a(1).toString,
                    a(2).toString
                  )).toDF
    x_df.show()
  }

  // -- Loop through each file and call the function 'convertToDF'
   files.foreach(filename => {
               val a = filename.getPath.toString()
               convertToDF(a)
             })

Note:

I'm using Spark 1.6 and scala.

I am using one function called "checkDelim" which checks the delimiter of the first row of each file under the directory.

"convertToDataframe" function then knows how to split the rows and converts the data into a dataframe.

Pretty simple!

6 REPLIES 6

Re: How can I read all files in a directory using scala

New Contributor

With spark 2:

Generate test files:

echo "1,2,3" > /tmp/test.csv
echo "1|2|3" > /tmp/test.psv

Read csv:

scala> val t = spark.read.csv("/tmp/test.csv")
t: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 1 more field]

scala> t.show()
+---+---+---+
|_c0|_c1|_c2|
+---+---+---+
|  1|  2|  3|
+---+---+---+

Read psv:

scala> val p = spark.read.option("delimiter","|").csv("/tmp/test.psv")
p: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 1 more field]

scala> p.show()
+---+---+---+
|_c0|_c1|_c2|
+---+---+---+
|  1|  2|  3|
+---+---+---+

You can also read from "/tmp/test*.csv" But it will read multiple files to the same dataset.

For older versions of spark you can use: https://github.com/databricks/spark-csv

Re: How can I read all files in a directory using scala

Rising Star
@melek

Here am trying for a single funtion which will read all the file in a dir and take action w.r.t to its type. Each file will go through if condition.

If (csv) then split with comma else pipe.

Re: How can I read all files in a directory using scala

Better to use different file extensions and patterns for each, e.g .csv and .pipe, to make them their own RDD. Spark parallelises based on the number of sources; .csv files aren't splittable, so the max amount of executors you get depends on the file count.

tip: use the inferSchema option to scan through a reference CSV file, look at the output and then convert that to a hard coded schema. The inference process involves a scan through the entire file, and is not something you want to repeat on a stable CSV format

Re: How can I read all files in a directory using scala

Expert Contributor

Hi, @Dinesh Das

Could you try something like the following?

scala> import org.apache.spark.sql.Row
scala> import org.apache.spark.sql.types._
scala> spark.createDataFrame(sc.textFile("/data/csvpsv").map(_.split("[,|]")).map(cols => Row(cols(0),cols(1),cols(2))), StructType(Seq(StructField("c1", StringType), StructField("c2", StringType), StructField("c3", StringType)))).show
+---+---+---+
| c1| c2| c3|
+---+---+---+
|  1|  2|  3|
|  1|  2|  3|
+---+---+---+

Re: How can I read all files in a directory using scala

Expert Contributor

Hi @Dinesh Das the following code is tested on spark-shell with scala and works perfectly with psv and csv data.

the following are the datasets I used from the same directory

/data/dev/spark

file1.csv

1,2,3 
x,y,z
a,b,c

file2.psv

q|w|e
1|2|3

To test, you can copy paste my code into spark shell (copy only few lines/functions at a time, do not paste all code at once in Spark Shell)

    import org.apache.spark.{ SparkConf, SparkContext }
    import org.apache.spark.sql.functions.broadcast
    import org.apache.spark.sql.types._
    import org.apache.spark.sql._
    import org.apache.spark.sql.functions._

    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    import sqlContext.implicits._

    // --EDIT YOUR SCHEMA HERE
    case class refLineID(
      attr1:String,
      attr2:String,
      attr3:String
    )

    import  org.apache.hadoop.fs.{FileSystem,Path}


    val files = FileSystem.get( sc.hadoopConfiguration ).listStatus(new Path("/data/dev/spark"))

    // function to check delimiter of each file
    def checkDelim(file:String): String ={
      val x = sc.textFile(file);
      val grab_x = x.take(1) // grab the first row to check delimiter
      val str = grab_x.mkString("")
      val pipe = "\\|"
      val comma = "\\,"
      var delim = ""
      for (c <- str) {
        if (c == ',') {
          delim = comma
        } else if (c == '|') {
          delim = pipe
        }
      }
      return delim
    }

    // -- Function to convert RDD to dataframe after checking delimiter
    def convertToDF(file: String) = {

     var delim = ""
     delim = checkDelim(file) // grab the delimiter by calling function

     val x = sc.textFile(file);
     // pass the file and delimiter type to transform to dataframe
     val x_df = x.map(_.split(delim))
                 .map(a => refLineID(
                    a(0).toString,
                    a(1).toString,
                    a(2).toString
                  )).toDF
    x_df.show()
  }

  // -- Loop through each file and call the function 'convertToDF'
   files.foreach(filename => {
               val a = filename.getPath.toString()
               convertToDF(a)
             })

Note:

I'm using Spark 1.6 and scala.

I am using one function called "checkDelim" which checks the delimiter of the first row of each file under the directory.

"convertToDataframe" function then knows how to split the rows and converts the data into a dataframe.

Pretty simple!

Re: How can I read all files in a directory using scala

New Contributor
val path = "adl://azuredatalakestore.net/xxx/Budget/*.xlsx"

val sc = spark.sparkContext

val data = sc.wholeTextFiles(path)

var z: Array[String] = new Array[String](7)
  var i=1
val files = data.map { case (filename, content) => filename }
files.collect.foreach(filename => {
println(i + "->" + filename)
z(i) = filename println(z(i))
i = i + 1})