Created 12-29-2017 06:40 AM
I am reading csv files from s3 and writing into a hive table as orc. While writing, it is writing lot of small files. I need to merge all these files. I have following properties set:
spark.sql("SET hive.merge.sparkfiles = true") spark.sql("SET hive.merge.mapredfiles = true") spark.sql("SET hive.merge.mapfiles = true") spark.sql("set hive.merge.smallfiles.avgsize = 128000000") spark.sql("set hive.merge.size.per.task = 128000000")
apart from these configurations I tried repartition(1) and coalesce(1) which does the merge into single file but it deletes the hive table and creates it again.
masterFile.repartition(1).write.mode(SaveMode.Overwrite).partitionBy(<partitioncolumn>).orc(<HIVEtbl>)
If I use Append mode instead of Overwrite it creates duplication files under each partition.
masterFile.repartition(1).write.mode(SaveMode.Append).partitionBy(<partitioncolumn>).orc(<HIVEtbl>)
In both cases the spark job runs twice and fails in second execution.
Is there any way that I can use repartition/coalesce with Append mode without duplication of part file in each partition?
Created 12-29-2017 06:43 PM
@R F I might suggest you spend some time in the documentation understanding what the difference between SaveMode.Append does and SaveMode.Overwrite. If you are appending you are adding to a table, if you are overwriting you are deleting and then adding files to a able.
All of the settings you set are map-reduce settings and not spark settings so you won't get any help there until you actually run a map-reduce job. (As your are using spark the settings won't help you.)
Could you provide the table definition and version of spark that you are using? That might help to understand what's happening.
I agree with your attempt to make larger files for performance, but is this a long term fix you are making or a long term problem? Spark is creating many partitions to make work easier in the future. You don't want to make one file when you have billions of rows. You should be writing by partition into this table, that will give you best performance in the long run. (Even if it means small files now, later it will be more efficient writing.)
I hope this helps, let me know if you need more clarity.
Created 12-29-2017 06:45 PM
I guess I should have said, it's because you are writing by partition that you are getting multiple small files...
Created 01-02-2018 08:36 AM
@Matt Andruff Thanks for responding. The question is not about difference between SaveMode.Append and SaveMode.Overwrite, I am concern about the spark job failure which happens due to multiple execution of same task while using repartition/coalesce.
Created 01-03-2018 04:44 PM
@R F - unless you are using "small data" don't use
repartition(1)
If you are getting small files it's because each existing spark partition is writing to it's own partition file. If you want one file per partition you can use this:
masterFile.repartition(<partitioncolumn>).write.mode(SaveMode.Append).partitionBy(<partitioncolumn>).orc(<HIVEtbl>)
This will repartition in spark by (partitioncolumn) which will ensure that there is only one file that could write to "partitionBy" column. If that makes sense. It's going to make the biggest possible files you could hope for.
Created 01-03-2018 04:49 PM
If there is a large amount of skew you are going to find a new error... so be careful with this. You may wish to revert back to the small file problem.
I should mention that if you want you could use a hive statement to 'rebuild' the table which would invoke your settings that you mentioned in the first post. That isn't ideal, as it's more efficient to just write it correctly the first time. But if you want some magic, and aren't going to do this as an ETL process repeatedly but a one time import that might be an acceptable answer.