Created 07-28-2016 01:33 PM
Dear community,
I am trying to read multiple csv files using Apache Spark. However it omits only header in a first file.
Code using databricks and just filtering header:
String Files = "/path/to/files/*.csv"; SparkConf sConf = new SparkConf().setAppName("Some task"); sConf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer"); sConf.set("spark.kryo.registrator", KryoClassRegistrator.class.getName()); JavaSparkContext sc = new JavaSparkContext(sConf); SQLContext sqlContext = new SQLContext(sc); DataFrame MyDataSet = sqlContext.read() .format("com.databricks.spark.csv") .option("header", "true") .load(Files);
String Files = "/path/to/files/*.csv"; SparkConf sConf = new SparkConf().setAppName("Some task"); sConf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer"); sConf.set("spark.kryo.registrator", KryoClassRegistrator.class.getName()); JavaSparkContext sc = new JavaSparkContext(sConf); // filter header JavaRDD<String> textFromFileWhole = sc.textFile(Files); final String header = textFromFileWhole.first(); JavaRDD<String> textFromFile = textFromFileWhole.filter(new Function<String, Boolean>() { @Override public Boolean call(String s) throws Exception { return !s.equalsIgnoreCase(header); } }); // work with file
In both variants the header is omitted only in first file.
Created 07-28-2016 03:02 PM
If you use the second approach, instead of sc.textFile, you could use sc.wholeTextFiles. Then with a map method call you could strip the headers. Then use flatMap to convert the value (whole text file per element) to the records. Then leverage spark-csv capabilities.
Created 07-28-2016 03:02 PM
If you use the second approach, instead of sc.textFile, you could use sc.wholeTextFiles. Then with a map method call you could strip the headers. Then use flatMap to convert the value (whole text file per element) to the records. Then leverage spark-csv capabilities.
Created 07-28-2016 05:57 PM
Yes, this really works. Had forgotten about this method))) It would be also interesting to see how databricks works on that.
Created 07-29-2016 08:19 AM
Found another approach with sc.wholeTextFiles. Just make flatmap on its result and make a class which checks for header in file.
Created 07-28-2016 06:16 PM
wholeTextFiles was a nice approach, I have used the below approach after finding the issue with databricks library.
val files = sc.newAPIHadoopRDD(conf, classOf[TextInputFormat], classOf[LongWritable], classOf[Text]) val headerLessRDD = files.filter(f => f._1.get!=0).values.map { x => Row.fromSeq(x.toString().split(",")) } val header = files.filter(f => f._1.get==0).first()._2.toString() val schema = StructType(header.split(",").map(fieldName => StructField(fieldName, StringType, true))) val dataFrame =sqlContext.createDataFrame(headerLessRDD, schema)
Basic idea was to read the file using TextInputFormat and skip the line if the start offset is 0
Created 07-29-2016 08:20 AM
What about SPARK 2.0? Should I make sc.wholeTextFiles there also or there is some more intelligent way?