How can I read all files in a directory using scala
Apache Spark
Created ‎02-16-2017 09:11 AM
I have 1 CSV (comma separated) and 1 PSV ( pipe separated ) files in the same dir /data/dev/spark
How can I read each file and convert them to their own dataframe using scala.
Created ‎02-16-2017 09:48 AM
With spark 2:
Generate test files:
echo "1,2,3" > /tmp/test.csv echo "1|2|3" > /tmp/test.psv
Read csv:
scala> val t = spark.read.csv("/tmp/test.csv") t: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 1 more field] scala> t.show() +---+---+---+ |_c0|_c1|_c2| +---+---+---+ | 1| 2| 3| +---+---+---+
Read psv:
scala> val p = spark.read.option("delimiter","|").csv("/tmp/test.psv") p: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 1 more field] scala> p.show() +---+---+---+ |_c0|_c1|_c2| +---+---+---+ | 1| 2| 3| +---+---+---+
You can also read from "/tmp/test*.csv" But it will read multiple files to the same dataset.
For older versions of spark you can use: https://github.com/databricks/spark-csv
Created ‎02-16-2017 09:56 AM
Here am trying for a single funtion which will read all the file in a dir and take action w.r.t to its type. Each file will go through if condition.
If (csv) then split with comma else pipe.
Created ‎02-16-2017 10:07 AM
Better to use different file extensions and patterns for each, e.g .csv and .pipe, to make them their own RDD. Spark parallelises based on the number of sources; .csv files aren't splittable, so the max amount of executors you get depends on the file count.
tip: use the inferSchema option to scan through a reference CSV file, look at the output and then convert that to a hard coded schema. The inference process involves a scan through the entire file, and is not something you want to repeat on a stable CSV format
Created ‎02-16-2017 06:51 PM
Hi, @Dinesh Das
Could you try something like the following?
scala> import org.apache.spark.sql.Row scala> import org.apache.spark.sql.types._ scala> spark.createDataFrame(sc.textFile("/data/csvpsv").map(_.split("[,|]")).map(cols => Row(cols(0),cols(1),cols(2))), StructType(Seq(StructField("c1", StringType), StructField("c2", StringType), StructField("c3", StringType)))).show +---+---+---+ | c1| c2| c3| +---+---+---+ | 1| 2| 3| | 1| 2| 3| +---+---+---+
Created ‎02-07-2019 03:11 PM
val path = "adl://azuredatalakestore.net/xxx/Budget/*.xlsx" val sc = spark.sparkContext
val data = sc.wholeTextFiles(path)
var z: Array[String] = new Array[String](7) var i=1 val files = data.map { case (filename, content) => filename }
files.collect.foreach(filename => {
println(i + "->" + filename)
z(i) = filename println(z(i))
i = i + 1})