Support Questions

Find answers, ask questions, and share your expertise

Reading multiple csv files without headers using spark

avatar
Expert Contributor

Dear community,

I am trying to read multiple csv files using Apache Spark. However it omits only header in a first file.

Code using databricks and just filtering header:

String Files = "/path/to/files/*.csv";
SparkConf sConf = new SparkConf().setAppName("Some task");
sConf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer");
sConf.set("spark.kryo.registrator", KryoClassRegistrator.class.getName());
JavaSparkContext sc = new JavaSparkContext(sConf);

SQLContext sqlContext = new SQLContext(sc);
DataFrame MyDataSet = sqlContext.read()
        .format("com.databricks.spark.csv")
        .option("header", "true")
        .load(Files);
String Files = "/path/to/files/*.csv";
SparkConf sConf = new SparkConf().setAppName("Some task");
sConf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer");
sConf.set("spark.kryo.registrator", KryoClassRegistrator.class.getName());
JavaSparkContext sc = new JavaSparkContext(sConf);

// filter header
JavaRDD<String> textFromFileWhole = sc.textFile(Files);
            final String header = textFromFileWhole.first();
            JavaRDD<String> textFromFile = textFromFileWhole.filter(new Function<String, Boolean>() {
                @Override
                public Boolean call(String s) throws Exception {
                    return !s.equalsIgnoreCase(header);
                }
            });
// work with file

In both variants the header is omitted only in first file.

1 ACCEPTED SOLUTION

avatar
Super Collaborator

If you use the second approach, instead of sc.textFile, you could use sc.wholeTextFiles. Then with a map method call you could strip the headers. Then use flatMap to convert the value (whole text file per element) to the records. Then leverage spark-csv capabilities.

View solution in original post

5 REPLIES 5

avatar
Super Collaborator

If you use the second approach, instead of sc.textFile, you could use sc.wholeTextFiles. Then with a map method call you could strip the headers. Then use flatMap to convert the value (whole text file per element) to the records. Then leverage spark-csv capabilities.

avatar
Expert Contributor

Yes, this really works. Had forgotten about this method))) It would be also interesting to see how databricks works on that.

avatar
Expert Contributor

Found another approach with sc.wholeTextFiles. Just make flatmap on its result and make a class which checks for header in file.

avatar
Super Collaborator

wholeTextFiles was a nice approach, I have used the below approach after finding the issue with databricks library.

val files = sc.newAPIHadoopRDD(conf, classOf[TextInputFormat], classOf[LongWritable], classOf[Text]) 
val headerLessRDD = files.filter(f => f._1.get!=0).values.map { x => Row.fromSeq(x.toString().split(",")) }  
val header = files.filter(f => f._1.get==0).first()._2.toString()
val schema = StructType(header.split(",").map(fieldName => StructField(fieldName, StringType, true)))
val dataFrame =sqlContext.createDataFrame(headerLessRDD, schema)

Basic idea was to read the file using TextInputFormat and skip the line if the start offset is 0

avatar
Expert Contributor

What about SPARK 2.0? Should I make sc.wholeTextFiles there also or there is some more intelligent way?