Support Questions
Find answers, ask questions, and share your expertise

Spark 2 Dataframe Save to Hive - Compaction

I am using spark session to save a data frame to hive table. The code is as below.

df.write.mode(SaveMode.Append).format("orc").insertInto("table")

The data comes to spark from kafka. This can be huge amount of data coming throughout the day. Does , spark dataframe save internally does hive compaction ?. If not what is the best way to do compaction at regular intervals without affecting the table insertions.

4 REPLIES 4

Re: Spark 2 Dataframe Save to Hive - Compaction

Expert Contributor

Hi @PremKumar Karunakaran,

It depend or your use case:

- You can execute a repartition / coleasce function on you dataframe before insert it in hive.

- You can do a a bucketing:

peopleDF.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")

Re: Spark 2 Dataframe Save to Hive - Compaction

Expert Contributor
@PremKumar Karunakaran

can you please check if the compaction is enabled from Hive configs? if Yes, then the Hive compaction should be happening already, if not then you can enable the compaction from Ambari --> Hive --> Configs --> Settings --> Run Compactor this will do the compaction using the properties:

hive.compactor.check.interval 
hive.compactor.delta.num.threshold

Re: Spark 2 Dataframe Save to Hive - Compaction

Expert Contributor

@PremKumar Karunakaran did you solve your issue?

Re: Spark 2 Dataframe Save to Hive - Compaction

New Contributor

I have written a common utility to do the

1)Compaction at whole table level as below in java :

  • hive > Create external table stg.dummy_table_compacted like stg.dummy_table;
  • spark.sql("insert overwrite table stg.dummy_table_compacted partition(trade_date,dwh_business_date,region_cd) select * from stg.dummy_table");
  • hdfs > hadoop fs -rm -r -skipTrash hdfs://hdfs-path/data/staging/hive/stg/dummy_table
  • spark.sql("insert overwrite table stg.dummy_table partition(trade_date,dwh_business_date,region_cd) select * from stg.dummy_table_compacted");

Note : second overwrite only do a file copy so less time


2)Compaction on one partition level:


1)spark.sql("select distinct partition_col1 as b_partition_col1,partition_col2 as b_partition_col2 ,partition_col3 as b_partition_col3 from stg.dummy_table").registerTempTable("distinct_partitions")

2)spark.sql("insert overwrite table stg.dummy_table_compacted partition(partition_col1,partition_col2,partition_col3) select a.* from stg.dummy_table a, distinct_partitions b where a.partition_col1 = b.b_partition_col1 and a.partition_col2 =b.b_partition_col2 and a.partition_col3 = b.b_partition_col3 and a.partition_col1 = '50'");

3)hadoop fs -rm -r -skipTrash hdfs://dev-icg/data/user/stg/hive/stg/dummy_table/partition_col1=50/*

4)spark.sql("insert overwrite table stg.dummy_table partition(partition_col1,partition_col2,partition_col3) select a.* from stg.dummy_table_compacted a, distinct_partitions b where a.partition_col1 = b.b_partition_col1 and a.partition_col1 = '50'");


3)Compaction on two partition level:


1) spark.sql("select distinct partition_col1 as b_partition_col1,partition_col2 as b_partition_col2 ,partition_col3 as b_partition_col3 from stg.dummy_table").registerTempTable("distinct_partitions")

2) spark.sql("insert overwrite table stg.dummy_table_compacted partition(partition_col1,partition_col2,partition_col3) select a.* from stg.dummy_table a, distinct_partitions b where a.partition_col1 = b.b_partition_col1 and a.partition_col2 =b.b_partition_col2 and a.partition_col3 = b.b_partition_col3 and a.partition_col1 = '50' and a.partition_col2='20'");

3) hadoop fs -rm -r -skipTrash hdfs://hdfs-path/data/user/stg/hive/stg/dummy_table/partition_col1=50/partition_col2=20/*

4) spark.sql("insert overwrite table stg.dummy_table partition(partition_col1,partition_col2,partition_col3) select a.* from stg.dummy_table_compacted a, distinct_partitions b where a.partition_col1 = b.b_partition_col1 and a.partition_col2 =b.b_partition_col2 and a.partition_col3 = b.b_partition_col3 and a.partition_col1 = '50' and a.partition_col2='20'");