Question: Can I reduce the time it takes for a merge to execute for a ~400 million row table?
I'm using the ACID merge functionality similar to the following posts hive-acid-merge-by-example & update-hive-tables-easy-way-2 to track Slowly Changing Dimensions. This is working great to capture some time-series data and it is working as expected. This Hortonworks post says the merge took shorter than mine with more records in the staging table (less nodes too).
I'd really like to cut down on the time it takes for this task to execute, though. It is taking ~30-50 minutes for 200k-500k records to be merged to a 400 million row destination table.
DDL and SQL: (simplified)
--Table containing new records from a source table -- update_stamp tracks when the row was updated last in source CREATE TABLE db.stg_tbl ( id int, nbr smallint, column3 decimal(9,2), update_stamp timestamp ) STORED AS ORC; --Destination table, tracking the valid_from and valid_to time -- where a valid_to containing NULL means that is the current state -- and a timestamp in this column signifying that -- its a historical record with a new row showing the state of this id, nbr combo CREATE TABLE db.dest_tbl ( id int, nbr smallint, column3 decimal(9,2), update_stamp timestamp, VALID_FROM timestamp, VALID_TO timestamp ) CLUSTERED BY (id, nbr) INTO 24 BUCKETS STORED AS ORC TBLPROPERTIES("transactional"="true");
MERGE INTO db.dest_tbl target USING( --base staging SELECT stg_a.id as join_itm, stg_a.nbr as join_lct, stg_a.* FROM db.stg_tbl stg_a UNION ALL --Generate extra row for changed records where null --join_id_nbr key means it will be inserted SELECT NULL, NULL, stg_b.* FROM db.stg_tbl stg_b JOIN db.dest_tbl tgt ON stg_b.id = tgt.id AND stg_b.nbr = tgt.nbr WHERE (stg_b.update_stamp <> tgt.update_stamp) AND tgt.valid_to IS NULL ) sub ON sub.join_id = target.id AND sub.join_nbr = target.nbr WHEN matched AND sub.update_stamp <> target.update_stamp AND valid_to IS NULL THEN UPDATE SET valid_to = current_timestamp() WHEN not matched THEN INSERT VALUES (sub.id, sub.nbr, sub.column3, current_timestamp(), NULL);
Note the above design of clustered by the 2 columns which is necessary, and the 24 buckets. I chose this number to be within one to three blocks per bucket/file, which is trying to remain within a 1-10 blocks per file recommendation I read.
If I bump the buckets up to 48, just to double it and see the side effects, it does little to nothing. With some hive parameters below, I was able to see a 45% reduction in the time it ran (not sure if this had anything to do with others jobs running). 2909 seconds to 1547 seconds. My concern is that as I add more buckets it is only adding more small files that don't even satisfy one blocksize. Not to mention, the deltas that are created with the merge contain only the 200-500k records each time which is a small amount of data for the buckets.
SET hive.merge.cardinality.check=false; SET hive.exec.dynamic.partition = true; SET hive.exec.dynamic.partition.mode = nonstrict; SET hive.vectorized.execution.enabled=false; SET hive.vectorized.execution.reduce.enabled=false; SET hive.enforce.bucketing=true; SET hive.exec.parallel=true; SET hive.auto.convert.join=false; SET hive.enforce.bucketmapjoin=true; SET hive.optimize.bucketmapjoin.sortedmerge=true; SET hive.optimize.bucketmapjoin=true;
There is a 3rd reducer that seems to take the most time, and it has a low number of reducers (2, or 6 with the 48 bucket attempt)
Any insight as to how I could create this table or tune the query would be helpful.
what version are you on? Is hive.tez.dynamic.semijoin.reduction available/enabled?
Small files are a transient issue - compaction will merge them into fewer.
A side note: hive.merge.cardinality.check=false is probably a bad idea. This should make very little difference for perf but could lead to data corruption if you the condition it checks for is violated (i.e. if more than 1 row from sources matches the same row on target).
HDP 2.6.1 is the version. Hive is telling me "hive.tez.dynamic.semijoin.reduction is undefined" and then "Query returned non-zero code: 1, cause: hive configuration hive.tez.dynamic.semijoin.reduction does not exists." if I try to set it to true.
Thanks for the response Eugene! I was more or less experimenting with the cardinality check, but I will remove it to see if it alone affects performance. I did read that info when I enabled it, but it had said that it could impact performance so I tried it out. Considering the file size, would you say there is a max bucket size to watch for? It seems consensus is that there is no tried and true for bucket size.
When choosing number of buckets I would think about your "largest" write since the parallelism of the write is limited to the bucket count. Note that update/delete (and merge) are writes.
From read side I would optimize bucket count for reading a fully (major) compacted table though since Acid tables require system defined sort order they do not support SMB joins and I'm not sure what else can benefit from bucketing.
Since the writes vary greatly, I would not worry much about file sizes (meaning I don't think there is a good way to get this right) in delta directories and make sure that compaction runs frequently enough to mitigate this.
hive.tez.dynamic.semijoin.reduction should be available in Hive2 in 2.6.1. The implementation of Merge uses a Right Outer Join (if you have Insert clause) and SJ reduction is designed to help this when the inner side (the target of merge) is large.