Created 02-16-2017 09:11 AM
I have 1 CSV (comma separated) and 1 PSV ( pipe separated ) files in the same dir /data/dev/spark
How can I read each file and convert them to their own dataframe using scala.
Created 02-20-2017 10:44 PM
Hi @Dinesh Das the following code is tested on spark-shell with scala and works perfectly with psv and csv data.
the following are the datasets I used from the same directory
/data/dev/spark
file1.csv
1,2,3 x,y,z a,b,c
file2.psv
q|w|e 1|2|3
To test, you can copy paste my code into spark shell (copy only few lines/functions at a time, do not paste all code at once in Spark Shell)
import org.apache.spark.{ SparkConf, SparkContext } import org.apache.spark.sql.functions.broadcast import org.apache.spark.sql.types._ import org.apache.spark.sql._ import org.apache.spark.sql.functions._ val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.implicits._ // --EDIT YOUR SCHEMA HERE case class refLineID( attr1:String, attr2:String, attr3:String ) import org.apache.hadoop.fs.{FileSystem,Path} val files = FileSystem.get( sc.hadoopConfiguration ).listStatus(new Path("/data/dev/spark")) // function to check delimiter of each file def checkDelim(file:String): String ={ val x = sc.textFile(file); val grab_x = x.take(1) // grab the first row to check delimiter val str = grab_x.mkString("") val pipe = "\\|" val comma = "\\," var delim = "" for (c <- str) { if (c == ',') { delim = comma } else if (c == '|') { delim = pipe } } return delim } // -- Function to convert RDD to dataframe after checking delimiter def convertToDF(file: String) = { var delim = "" delim = checkDelim(file) // grab the delimiter by calling function val x = sc.textFile(file); // pass the file and delimiter type to transform to dataframe val x_df = x.map(_.split(delim)) .map(a => refLineID( a(0).toString, a(1).toString, a(2).toString )).toDF x_df.show() } // -- Loop through each file and call the function 'convertToDF' files.foreach(filename => { val a = filename.getPath.toString() convertToDF(a) })
Note:
I'm using Spark 1.6 and scala.
I am using one function called "checkDelim" which checks the delimiter of the first row of each file under the directory.
"convertToDataframe" function then knows how to split the rows and converts the data into a dataframe.
Pretty simple!
Created 02-16-2017 09:48 AM
With spark 2:
Generate test files:
echo "1,2,3" > /tmp/test.csv echo "1|2|3" > /tmp/test.psv
Read csv:
scala> val t = spark.read.csv("/tmp/test.csv") t: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 1 more field] scala> t.show() +---+---+---+ |_c0|_c1|_c2| +---+---+---+ | 1| 2| 3| +---+---+---+
Read psv:
scala> val p = spark.read.option("delimiter","|").csv("/tmp/test.psv") p: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 1 more field] scala> p.show() +---+---+---+ |_c0|_c1|_c2| +---+---+---+ | 1| 2| 3| +---+---+---+
You can also read from "/tmp/test*.csv" But it will read multiple files to the same dataset.
For older versions of spark you can use: https://github.com/databricks/spark-csv
Created 02-16-2017 09:56 AM
Here am trying for a single funtion which will read all the file in a dir and take action w.r.t to its type. Each file will go through if condition.
If (csv) then split with comma else pipe.
Created 02-16-2017 10:07 AM
Better to use different file extensions and patterns for each, e.g .csv and .pipe, to make them their own RDD. Spark parallelises based on the number of sources; .csv files aren't splittable, so the max amount of executors you get depends on the file count.
tip: use the inferSchema option to scan through a reference CSV file, look at the output and then convert that to a hard coded schema. The inference process involves a scan through the entire file, and is not something you want to repeat on a stable CSV format
Created 02-16-2017 06:51 PM
Hi, @Dinesh Das
Could you try something like the following?
scala> import org.apache.spark.sql.Row scala> import org.apache.spark.sql.types._ scala> spark.createDataFrame(sc.textFile("/data/csvpsv").map(_.split("[,|]")).map(cols => Row(cols(0),cols(1),cols(2))), StructType(Seq(StructField("c1", StringType), StructField("c2", StringType), StructField("c3", StringType)))).show +---+---+---+ | c1| c2| c3| +---+---+---+ | 1| 2| 3| | 1| 2| 3| +---+---+---+
Created 02-20-2017 10:44 PM
Hi @Dinesh Das the following code is tested on spark-shell with scala and works perfectly with psv and csv data.
the following are the datasets I used from the same directory
/data/dev/spark
file1.csv
1,2,3 x,y,z a,b,c
file2.psv
q|w|e 1|2|3
To test, you can copy paste my code into spark shell (copy only few lines/functions at a time, do not paste all code at once in Spark Shell)
import org.apache.spark.{ SparkConf, SparkContext } import org.apache.spark.sql.functions.broadcast import org.apache.spark.sql.types._ import org.apache.spark.sql._ import org.apache.spark.sql.functions._ val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.implicits._ // --EDIT YOUR SCHEMA HERE case class refLineID( attr1:String, attr2:String, attr3:String ) import org.apache.hadoop.fs.{FileSystem,Path} val files = FileSystem.get( sc.hadoopConfiguration ).listStatus(new Path("/data/dev/spark")) // function to check delimiter of each file def checkDelim(file:String): String ={ val x = sc.textFile(file); val grab_x = x.take(1) // grab the first row to check delimiter val str = grab_x.mkString("") val pipe = "\\|" val comma = "\\," var delim = "" for (c <- str) { if (c == ',') { delim = comma } else if (c == '|') { delim = pipe } } return delim } // -- Function to convert RDD to dataframe after checking delimiter def convertToDF(file: String) = { var delim = "" delim = checkDelim(file) // grab the delimiter by calling function val x = sc.textFile(file); // pass the file and delimiter type to transform to dataframe val x_df = x.map(_.split(delim)) .map(a => refLineID( a(0).toString, a(1).toString, a(2).toString )).toDF x_df.show() } // -- Loop through each file and call the function 'convertToDF' files.foreach(filename => { val a = filename.getPath.toString() convertToDF(a) })
Note:
I'm using Spark 1.6 and scala.
I am using one function called "checkDelim" which checks the delimiter of the first row of each file under the directory.
"convertToDataframe" function then knows how to split the rows and converts the data into a dataframe.
Pretty simple!
Created 02-07-2019 03:11 PM
val path = "adl://azuredatalakestore.net/xxx/Budget/*.xlsx" val sc = spark.sparkContext
val data = sc.wholeTextFiles(path)
var z: Array[String] = new Array[String](7) var i=1 val files = data.map { case (filename, content) => filename }
files.collect.foreach(filename => {
println(i + "->" + filename)
z(i) = filename println(z(i))
i = i + 1})