Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

erge csv files in one file

erge csv files in one file

Explorer

hi guys

Am new to spark ans scala,i have csv files that i want tomerge in the same csv file or dataframe i want just to handle them as if they are only one file

Any help thanks

5 REPLIES 5
Highlighted

Re: erge csv files in one file

Expert Contributor

Hi @Maher Hattabi

For Spark 1.6+

What you need to do is load all the csv files with a for loop in a batch processing manner. As u inject the same schema into each of them, convert them to a dataframe; union each of the dataframe in another var. In that way, all of them will be just one dataframe. Following code does the work. You can follow my code and test them in spark-shell

Contents of file1.csv

x,y,z

Contents of file2.csv

a,b,c

c,d,e

Store them in a hdfs directory and change the path according to yours in the following code where it says 'hadoopPath'

NOTE: While working on spark-shell, don't paste all code at once. It yields error sometimes. Paste once bunch at a time.

    import org.apache.spark.{ SparkConf, SparkContext }
    import org.apache.spark.sql.functions.broadcast
    import org.apache.spark.sql.types._
    import org.apache.spark.sql._
    import org.apache.spark.sql.functions._

    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    import sqlContext.implicits._

    // -- EDIT YOUR SCHEMA HERE
    case class Test(
      attr1:String,
      attr2:String,
      attr3:String
    )

    import  org.apache.hadoop.fs.{FileSystem,Path}

    /*
    initialize empty dataframe
    (This dataframe will be the final one where we union all others)
    */
    var all_df = Seq.empty[Test].toDF

    // -- EDIT YOUR HDFS PATH HERE 'hadoopPath'
    val files = FileSystem.get( sc.hadoopConfiguration ).listStatus(new Path("/hadoopPath/"))

    // -- Function for all operations to be executed in each file iteration
    def convertToDFandUnion(file: String) = {

    val x = sc.textFile(file);

    val x_df = x.map(_.split("\\,"))
                 .map(a => refLineID(
                    a(0).toString,
                    a(1).toString,
                    a(2).toString
                  )).toDF
      x_df.show()

      // This is where we make each dataframe into one
      all_df = all_df.unionAll(x_df)
      all_df.show()
  }

  // -- Loop through each file and call the function 'ConvertToDFAnd Union'
   files.foreach(filename => {
               val a = filename.getPath.toString()
               convertToDFandUnion(a)
  })

Highlighted

Re: erge csv files in one file

Explorer
@Adnan Alvee

thanks , but the problem i do not know the schema of csv so i can't insitialize x_df. any help please thank you

Highlighted

Re: erge csv files in one file

Expert Contributor

you don't need the schema, as long as u know the number of columns. on my code I put attr1,attr2,attr3 as I had 3 columns in the data. If you have 15 columns for example, you can go from attr1,attr2....upto attr15 etc.

Highlighted

Re: erge csv files in one file

Explorer

the data are in local file system , they all have the same header i want to get one csv file with this header , is there a solution using spark-csv or any thing else nwant to loop and merge them any solution please thanks

Highlighted

Re: erge csv files in one file

Mentor
Don't have an account?
Coming from Hortonworks? Activate your account here