Created 05-11-2016 11:05 AM
Hi,
I am currently using Spark streaming to write to an external hive table every 30 mins.
rdd.toDF().write.partitionBy("dt").options(options).format("orc").mode(SaveMode.Append).saveAsTable("table_name")
The issue with this is it creates lots of small files in HDFS, like so
part-00000 part-00000_copy_1
My table was created with transactions enabled, and I have enabled ACID transactions on the Hive instance however, I can't see any compactions running nor do any get created when I force compaction with ALTER TABLE command. I would expect compaction to run and merge these files as they are very small 200 KB's in size.
Any idea's or help greatly appreciated
Created 05-11-2016 06:11 PM
Hi @Chris McGuire,
Can you please provide an "hdfs dfs -ls -R <table-folder>"
Compaction only operates on tables with delta directories. I suspect that the method you're using (SaveMode.Append) is just appending to the existing partition (or adding a new partition) and not actually creating deltas.
Best,
Eric
Created 05-11-2016 06:11 PM
Hi @Chris McGuire,
Can you please provide an "hdfs dfs -ls -R <table-folder>"
Compaction only operates on tables with delta directories. I suspect that the method you're using (SaveMode.Append) is just appending to the existing partition (or adding a new partition) and not actually creating deltas.
Best,
Eric
Created 05-11-2016 06:55 PM
@Eric Walk Thanks, Yes you are correct Spark isn't writing deltas its just adding to the existing partition.
Any idea on how to get Spark to write the delta's?
-rw-r--r-- 3 cmcguire hdfs 0 2016-05-11 16:36 /test_data/test_test_tbl/_SUCCESS drwxr-xr-x - cmcguire hdfs 0 2016-05-11 16:40 /test_data/test_tbl/dt=11-05-2016 -rwxr-xr-x 3 cmcguire hdfs 3750 2016-05-11 16:37 /test_data/test_tbl/dt=11-05-2016/part-00000 -rwxr-xr-x 3 cmcguire hdfs 5468 2016-05-11 16:37 /test_data/test_tbl/dt=11-05-2016/part-00000_copy_1 -rwxr-xr-x 3 cmcguire hdfs 8264 2016-05-11 16:38 /test_data/test_tbl/dt=11-05-2016/part-00000_copy_2 -rwxr-xr-x 3 cmcguire hdfs 7068 2016-05-11 16:38 /test_data/test_tbl/dt=11-05-2016/part-00000_copy_3 -rwxr-xr-x 3 cmcguire hdfs 5157 2016-05-11 16:39 /test_data/test_tbl/dt=11-05-2016/part-00000_copy_4 -rwxr-xr-x 3 cmcguire hdfs 10684 2016-05-11 16:39 /test_data/test_tbl/dt=11-05-2016/part-00000_copy_5 -rwxr-xr-x 3 cmcguire hdfs 4796 2016-05-11 16:40 /test_data/test_tbl/dt=11-05-2016/part-00000_copy_6
Created 05-11-2016 07:09 PM
@Chris McGuire, I'm not sure you're using the Hive Streaming API, then. I'm not sure how Spark Streaming is setup to write out to hive, so it could be behaving correctly.
Created 05-11-2016 06:28 PM
Make sure these are set properties are set to true as these would merge the small files into one or more big files.
hive.merge.mapfiles
hive.merge.mapredfiles
hive.merge.tezfiles
Created 05-11-2016 06:56 PM
Thanks, yes those properties are set, I believe its something to do with how the data is getting written to Hive via Spark Streaming
Created 05-11-2016 07:09 PM
Created 05-11-2016 07:19 PM
Yes, that could well be the reason. There are properties for hive to merge spark file. The property is called hive.merge.sparkfiles by default this is false. You may want to enable it and also look at this wiki for hive-spark configuration:
https://cwiki.apache.org/confluence/display/Hive/Hive+on+Spark%3A+Getting+Started
Created 05-11-2016 07:25 PM
Thanks, I will make sure the Spark version of the property is set
Thanks for the help, I wonder if instead of rdd.toDF().saveAsTable I should be writing insert statements this might force the delta files to be created.
Created 05-11-2016 07:29 PM
@Chris McGuire, that is probably the case, I'm not very familiar with the way spark is configured. I do know that, generally speaking, unless you explicitly say insert or use hive streaming you don't have deltas and don't need to worry about compaction. The partition append merging is a whole different story...