Community Articles

Find and share helpful community-sourced technical articles.
Labels (1)
avatar
Rising Star

Hive Streaming Compaction

This is the second part of the Hive Streaming Article series. In this article we will review the issues around compacting Hive Streaming files.

One of the results of ingesting data through Hive streaming is the creation of many small 'Delta' files. Left uncompacted you could run the risk of running into NameNode capacity problems. Fortunately, compaction functionality is part of Hive Streaming. The remainder of this Article reviews design considerations as well as commands necessary to enable and control compaction for your Hive tables.

Hive Compaction Design considerations

The Compaction process has a set of cleaner processes running in the background during the ingest process looking for opportunities to compact the delta files based on the rules you specify.

The first thing to keep in mind is that there are two forms of Compaction; ‘minor’ and ‘major’. A ‘minor’ compaction will just consolidate the delta files. This approach does not have to worry about consolidating all of the delta files along with a large set of base bucket files and is thus the least disruptive to the system resources. ‘major’ compaction consolidates all of the delta files just like the ‘minor’ compaction and in addition it consolidates the delta files with the base to produce a very clean physical layout for the hive table. However, major compactions can take minutes to hours and can consume a lot of disk, network, memory and CPU resources, so they should be invoked carefully.

To provide greater control over the compaction process and avoid impacting other processes in addition to the compactor configuration options available, it is also possible to invoke compaction automatically by the cleaner threads or manually initiated when system load is low.

The primary compaction configuration triggers to review when implementing or tuning your compaction processes are:

  • hive.compactor.initiator.on
  • hive.compactor.cleaner.run.interval
  • hive.compactor.delta.num.threshold - Number of delta directories in a table or partition that will trigger a minor compaction.
  • hive.compactor.delta.pct.threshold - Percentage (fractional) size of the delta files relative to the base that will trigger a major compaction. 1 = 100%, so the default 0.1 = 10%.
  • hive.compactor.abortedtxn.threshold - Number of aborted transactions involving a given table or partition that will trigger a major compaction

A Hive Compaction Manual example

In our example we have turned off major compaction as it should only run during off load periods. We take a look at the delta files for our table in hdfs and see that there are over 300 delta files and 5 base files.

[hive@server1 ~]$ hadoop fs -ls -R /apps/hive/warehouse/acidtest
-rw-r--r-- 3 mjohnson hdfs 4 2016-03-27 13:17        /apps/hive/warehouse/acidtest/_orc_acid_version
drwxrwxrwx - mjohnson hdfs 0 2016-03-27 13:18        /apps/hive/warehouse/acidtest/delta_2113501_2123500
-rw-r--r-- 3 mjohnson hdfs 482990 2016-03-27 13:18   /apps/hive/warehouse/acidtest/delta_2113501_2123500/bucket_00002
-rw-r--r-- 3 mjohnson hdfs 1600 2016-03-27 13:18     /apps/hive/warehouse/acidtest/delta_2113501_2123500/bucket_00002_flush_length
drwxrwxrwx - mjohnson hdfs 0 2016-03-27 13:17        /apps/hive/warehouse/acidtest/delta_2123501_2133500
-rw-r--r-- 3 mjohnson hdfs 482784 2016-03-27 13:18   /apps/hive/warehouse/acidtest/delta_2123501_2133500/bucket_00001
-rw-r--r-- 3 mjohnson hdfs 1600 2016-03-27 13:18     /apps/hive/warehouse/acidtest/delta_2123501_2133500/bucket_00001_flush_length
drwxrwxrwx - mjohnson hdfs 0 2016-03-27 13:17        /apps/hive/warehouse/acidtest/delta_2133501_2143500
-rw-r--r-- 3 mjohnson hdfs 482110 2016-03-27 13:18   /apps/hive/warehouse/acidtest/delta_2133501_2143500/bucket_00001
-rw-r--r-- 3 mjohnson hdfs 1600 2016-03-27 13:18     /apps/hive/warehouse/acidtest/delta_2133501_2143500/bucket_00001_flush_length
drwxrwxrwx - mjohnson hdfs 0 2016-03-27 13:17        /apps/hive/warehouse/acidtest/delta_2143501_2153500
-rw-r--r-- 3 mjohnson hdfs 476285 2016-03-27 13:18   /apps/hive/warehouse/acidtest/delta_2143501_2153500/bucket_00000
-rw-r--r-- 3 mjohnson hdfs 1600 2016-03-27 13:18     /apps/hive/warehouse/acidtest/delta_2143501_2153500/bucket_00000_flush_length
drwxrwxrwx - mjohnson hdfs 0 2016-03-27 13:17        /apps/hive/warehouse/acidtest/delta_2153501_2163500

A decision has been been made to run the major compaction manually during the even lull, so we execute the “ALTER TABLE {tablename} COMPACT ‘major’” command to place the compaction job into the queue for processing. A compaction resource management queue was defined with a limited quota resource, so the compaction will not impact other jobs.

hive> alter table acidtest compact 'major';
Compaction enqueued.
OK
Time taken: 0.037 seconds
hive> show compactions;
OK
Database Table Partition Type State Worker Start Time
default acidtest NULL MAJOR working server2.hdp-26 1459100244000
Time taken: 0.019 seconds, Fetched: 2 row(s)
hive> show compactions;
OK
Database Table Partition Type State Worker Start Time
Time taken: 0.016 seconds, Fetched: 1 row(s)
hive>;

The outstanding table compaction jobs are visible by executing the command line “SHOW COMPACTIONS as illustrated in the example above. Or the ‘major’ compaction is also visible through the Applications history log. After the ‘major’ compaction has completed, all of the delta files available at the time the compaction was initiated will have rolled up into the ‘base’ tables.

[hive@server1 ~]$ hadoop fs -ls -R /apps/hive/warehouse/acidtest
-rw-r--r-- 3 mjohnson hdfs 4 2016-03-27 13:17       /apps/hive/warehouse/acidtest_orc_acid_version
drwxrwxrwx - mjohnson hdfs 0 2016-03-27 13:37       /apps/hive/warehouse/acidtest/base_2213500
-rw-r--r-- 3 mjohnson hdfs 72704 2016-03-27 13:37   /apps/hive/warehouse/acidtest/base_2213500/bucket_00000
-rw-r--r-- 3 mjohnson hdfs 436159 2016-03-27 13:37  /apps/hive/warehouse/acidtest/base_2213500/bucket_00001
-rw-r--r-- 3 mjohnson hdfs 219572 2016-03-27 13:37  /apps/hive/warehouse/acidtest/base_2213500/bucket_00002
[hive@server1 ~]$

The end result of this example is that 305 consolidated to just 5 files. While 300 files will not impact the NameNode performance, it will most likely improve query performance as the Hive engine will have fewer files to scan to execute the query.

Bibliography

Hopefully, the example and source code supplied with this blog posting are sufficient to get you started with Hive Streaming and avoid potential problems. In addition to this blog posting some other resources which are useful references include:

8,653 Views