Created 02-23-2017 01:49 PM
hi guys
Am new to spark ans scala,i have csv files that i want tomerge in the same csv file or dataframe i want just to handle them as if they are only one file
Any help thanks
Created 02-23-2017 03:46 PM
For Spark 1.6+
What you need to do is load all the csv files with a for loop in a batch processing manner. As u inject the same schema into each of them, convert them to a dataframe; union each of the dataframe in another var. In that way, all of them will be just one dataframe. Following code does the work. You can follow my code and test them in spark-shell
Contents of file1.csv
x,y,z
Contents of file2.csv
a,b,c c,d,e
Store them in a hdfs directory and change the path according to yours in the following code where it says 'hadoopPath'
NOTE: While working on spark-shell, don't paste all code at once. It yields error sometimes. Paste once bunch at a time.
import org.apache.spark.{ SparkConf, SparkContext } import org.apache.spark.sql.functions.broadcast import org.apache.spark.sql.types._ import org.apache.spark.sql._ import org.apache.spark.sql.functions._ val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.implicits._ // -- EDIT YOUR SCHEMA HERE case class Test( attr1:String, attr2:String, attr3:String ) import org.apache.hadoop.fs.{FileSystem,Path} /* initialize empty dataframe (This dataframe will be the final one where we union all others) */ var all_df = Seq.empty[Test].toDF // -- EDIT YOUR HDFS PATH HERE 'hadoopPath' val files = FileSystem.get( sc.hadoopConfiguration ).listStatus(new Path("/hadoopPath/")) // -- Function for all operations to be executed in each file iteration def convertToDFandUnion(file: String) = { val x = sc.textFile(file); val x_df = x.map(_.split("\\,")) .map(a => refLineID( a(0).toString, a(1).toString, a(2).toString )).toDF x_df.show() // This is where we make each dataframe into one all_df = all_df.unionAll(x_df) all_df.show() } // -- Loop through each file and call the function 'ConvertToDFAnd Union' files.foreach(filename => { val a = filename.getPath.toString() convertToDFandUnion(a) })
Created 02-24-2017 09:09 AM
thanks , but the problem i do not know the schema of csv so i can't insitialize x_df. any help please thank you
Created 02-24-2017 05:50 PM
you don't need the schema, as long as u know the number of columns. on my code I put attr1,attr2,attr3 as I had 3 columns in the data. If you have 15 columns for example, you can go from attr1,attr2....upto attr15 etc.
Created 02-24-2017 08:40 AM
the data are in local file system , they all have the same header i want to get one csv file with this header , is there a solution using spark-csv or any thing else nwant to loop and merge them any solution please thanks
Created 02-24-2017 05:44 PM