Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

How can I read all files in a directory using scala

avatar
Expert Contributor

I have 1 CSV (comma separated) and 1 PSV ( pipe separated ) files in the same dir /data/dev/spark

How can I read each file and convert them to their own dataframe using scala.

1 ACCEPTED SOLUTION

avatar
Expert Contributor

Hi @Dinesh Das the following code is tested on spark-shell with scala and works perfectly with psv and csv data.

the following are the datasets I used from the same directory

/data/dev/spark

file1.csv

1,2,3 
x,y,z
a,b,c

file2.psv

q|w|e
1|2|3

To test, you can copy paste my code into spark shell (copy only few lines/functions at a time, do not paste all code at once in Spark Shell)

    import org.apache.spark.{ SparkConf, SparkContext }
    import org.apache.spark.sql.functions.broadcast
    import org.apache.spark.sql.types._
    import org.apache.spark.sql._
    import org.apache.spark.sql.functions._

    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    import sqlContext.implicits._

    // --EDIT YOUR SCHEMA HERE
    case class refLineID(
      attr1:String,
      attr2:String,
      attr3:String
    )

    import  org.apache.hadoop.fs.{FileSystem,Path}


    val files = FileSystem.get( sc.hadoopConfiguration ).listStatus(new Path("/data/dev/spark"))

    // function to check delimiter of each file
    def checkDelim(file:String): String ={
      val x = sc.textFile(file);
      val grab_x = x.take(1) // grab the first row to check delimiter
      val str = grab_x.mkString("")
      val pipe = "\\|"
      val comma = "\\,"
      var delim = ""
      for (c <- str) {
        if (c == ',') {
          delim = comma
        } else if (c == '|') {
          delim = pipe
        }
      }
      return delim
    }

    // -- Function to convert RDD to dataframe after checking delimiter
    def convertToDF(file: String) = {

     var delim = ""
     delim = checkDelim(file) // grab the delimiter by calling function

     val x = sc.textFile(file);
     // pass the file and delimiter type to transform to dataframe
     val x_df = x.map(_.split(delim))
                 .map(a => refLineID(
                    a(0).toString,
                    a(1).toString,
                    a(2).toString
                  )).toDF
    x_df.show()
  }

  // -- Loop through each file and call the function 'convertToDF'
   files.foreach(filename => {
               val a = filename.getPath.toString()
               convertToDF(a)
             })

Note:

I'm using Spark 1.6 and scala.

I am using one function called "checkDelim" which checks the delimiter of the first row of each file under the directory.

"convertToDataframe" function then knows how to split the rows and converts the data into a dataframe.

Pretty simple!

View solution in original post

6 REPLIES 6

avatar
Contributor

With spark 2:

Generate test files:

echo "1,2,3" > /tmp/test.csv
echo "1|2|3" > /tmp/test.psv

Read csv:

scala> val t = spark.read.csv("/tmp/test.csv")
t: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 1 more field]

scala> t.show()
+---+---+---+
|_c0|_c1|_c2|
+---+---+---+
|  1|  2|  3|
+---+---+---+

Read psv:

scala> val p = spark.read.option("delimiter","|").csv("/tmp/test.psv")
p: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 1 more field]

scala> p.show()
+---+---+---+
|_c0|_c1|_c2|
+---+---+---+
|  1|  2|  3|
+---+---+---+

You can also read from "/tmp/test*.csv" But it will read multiple files to the same dataset.

For older versions of spark you can use: https://github.com/databricks/spark-csv

avatar
Expert Contributor
@melek

Here am trying for a single funtion which will read all the file in a dir and take action w.r.t to its type. Each file will go through if condition.

If (csv) then split with comma else pipe.

avatar

Better to use different file extensions and patterns for each, e.g .csv and .pipe, to make them their own RDD. Spark parallelises based on the number of sources; .csv files aren't splittable, so the max amount of executors you get depends on the file count.

tip: use the inferSchema option to scan through a reference CSV file, look at the output and then convert that to a hard coded schema. The inference process involves a scan through the entire file, and is not something you want to repeat on a stable CSV format

avatar
Expert Contributor

Hi, @Dinesh Das

Could you try something like the following?

scala> import org.apache.spark.sql.Row
scala> import org.apache.spark.sql.types._
scala> spark.createDataFrame(sc.textFile("/data/csvpsv").map(_.split("[,|]")).map(cols => Row(cols(0),cols(1),cols(2))), StructType(Seq(StructField("c1", StringType), StructField("c2", StringType), StructField("c3", StringType)))).show
+---+---+---+
| c1| c2| c3|
+---+---+---+
|  1|  2|  3|
|  1|  2|  3|
+---+---+---+

avatar
Expert Contributor

Hi @Dinesh Das the following code is tested on spark-shell with scala and works perfectly with psv and csv data.

the following are the datasets I used from the same directory

/data/dev/spark

file1.csv

1,2,3 
x,y,z
a,b,c

file2.psv

q|w|e
1|2|3

To test, you can copy paste my code into spark shell (copy only few lines/functions at a time, do not paste all code at once in Spark Shell)

    import org.apache.spark.{ SparkConf, SparkContext }
    import org.apache.spark.sql.functions.broadcast
    import org.apache.spark.sql.types._
    import org.apache.spark.sql._
    import org.apache.spark.sql.functions._

    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    import sqlContext.implicits._

    // --EDIT YOUR SCHEMA HERE
    case class refLineID(
      attr1:String,
      attr2:String,
      attr3:String
    )

    import  org.apache.hadoop.fs.{FileSystem,Path}


    val files = FileSystem.get( sc.hadoopConfiguration ).listStatus(new Path("/data/dev/spark"))

    // function to check delimiter of each file
    def checkDelim(file:String): String ={
      val x = sc.textFile(file);
      val grab_x = x.take(1) // grab the first row to check delimiter
      val str = grab_x.mkString("")
      val pipe = "\\|"
      val comma = "\\,"
      var delim = ""
      for (c <- str) {
        if (c == ',') {
          delim = comma
        } else if (c == '|') {
          delim = pipe
        }
      }
      return delim
    }

    // -- Function to convert RDD to dataframe after checking delimiter
    def convertToDF(file: String) = {

     var delim = ""
     delim = checkDelim(file) // grab the delimiter by calling function

     val x = sc.textFile(file);
     // pass the file and delimiter type to transform to dataframe
     val x_df = x.map(_.split(delim))
                 .map(a => refLineID(
                    a(0).toString,
                    a(1).toString,
                    a(2).toString
                  )).toDF
    x_df.show()
  }

  // -- Loop through each file and call the function 'convertToDF'
   files.foreach(filename => {
               val a = filename.getPath.toString()
               convertToDF(a)
             })

Note:

I'm using Spark 1.6 and scala.

I am using one function called "checkDelim" which checks the delimiter of the first row of each file under the directory.

"convertToDataframe" function then knows how to split the rows and converts the data into a dataframe.

Pretty simple!

avatar
New Contributor
val path = "adl://azuredatalakestore.net/xxx/Budget/*.xlsx"

val sc = spark.sparkContext

val data = sc.wholeTextFiles(path)

var z: Array[String] = new Array[String](7)
  var i=1
val files = data.map { case (filename, content) => filename }
files.collect.foreach(filename => {
println(i + "->" + filename)
z(i) = filename println(z(i))
i = i + 1})