Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Are there optimizations for ACID Merge with 100+ million rows?

Highlighted

Are there optimizations for ACID Merge with 100+ million rows?

New Contributor

Question: Can I reduce the time it takes for a merge to execute for a ~400 million row table?

I'm using the ACID merge functionality similar to the following posts hive-acid-merge-by-example & update-hive-tables-easy-way-2 to track Slowly Changing Dimensions. This is working great to capture some time-series data and it is working as expected. This Hortonworks post says the merge took shorter than mine with more records in the staging table (less nodes too).

I'd really like to cut down on the time it takes for this task to execute, though. It is taking ~30-50 minutes for 200k-500k records to be merged to a 400 million row destination table.

DDL and SQL: (simplified)

--Table containing new records from a source table
--  update_stamp tracks when the row was updated last in source
CREATE TABLE db.stg_tbl
 (
     id int,
     nbr smallint,
     column3 decimal(9,2),
     update_stamp timestamp
 )
 STORED AS ORC;
--Destination table, tracking the valid_from and valid_to time
--    where a valid_to containing NULL means that is the current state
--    and a timestamp in this column signifying that 
--    its a historical record with a new row showing the state of this id, nbr combo
CREATE TABLE db.dest_tbl
(
    id int,
    nbr smallint,
    column3 decimal(9,2), 
    update_stamp timestamp, 
    VALID_FROM timestamp,
    VALID_TO timestamp
)
CLUSTERED BY (id, nbr) INTO 24 BUCKETS
STORED AS ORC
TBLPROPERTIES("transactional"="true");
MERGE INTO db.dest_tbl target
 USING(
 		--base staging
 		SELECT
 			stg_a.id as join_itm,
 			stg_a.nbr as join_lct,
 			stg_a.*
		FROM
 			db.stg_tbl stg_a
		UNION ALL
		--Generate extra row for changed records where null
		--join_id_nbr key means it will be inserted
		SELECT
			NULL,
			NULL,
			stg_b.*
		FROM
			db.stg_tbl stg_b
		JOIN
			db.dest_tbl tgt
			ON
			stg_b.id = tgt.id AND
			stg_b.nbr = tgt.nbr
		WHERE	
			(stg_b.update_stamp <> tgt.update_stamp) AND
			 tgt.valid_to IS NULL
 	) sub	
ON
	sub.join_id = target.id AND
	sub.join_nbr = target.nbr
WHEN matched
	AND sub.update_stamp <> target.update_stamp AND valid_to IS NULL
	THEN UPDATE SET valid_to = current_timestamp()
WHEN not matched
	THEN INSERT VALUES (sub.id, sub.nbr, sub.column3, current_timestamp(), NULL);

Note the above design of clustered by the 2 columns which is necessary, and the 24 buckets. I chose this number to be within one to three blocks per bucket/file, which is trying to remain within a 1-10 blocks per file recommendation I read.

If I bump the buckets up to 48, just to double it and see the side effects, it does little to nothing. With some hive parameters below, I was able to see a 45% reduction in the time it ran (not sure if this had anything to do with others jobs running). 2909 seconds to 1547 seconds. My concern is that as I add more buckets it is only adding more small files that don't even satisfy one blocksize. Not to mention, the deltas that are created with the merge contain only the 200-500k records each time which is a small amount of data for the buckets.

 SET hive.merge.cardinality.check=false;
 SET hive.exec.dynamic.partition = true;
 SET hive.exec.dynamic.partition.mode = nonstrict;
 SET hive.vectorized.execution.enabled=false;
 SET hive.vectorized.execution.reduce.enabled=false;
 SET hive.enforce.bucketing=true;
 SET hive.exec.parallel=true;
 SET hive.auto.convert.join=false;
 SET hive.enforce.bucketmapjoin=true;
 SET hive.optimize.bucketmapjoin.sortedmerge=true;
 SET hive.optimize.bucketmapjoin=true;

There is a 3rd reducer that seems to take the most time, and it has a low number of reducers (2, or 6 with the 48 bucket attempt)

Any insight as to how I could create this table or tune the query would be helpful.

3 REPLIES 3
Highlighted

Re: Are there optimizations for ACID Merge with 100+ million rows?

Expert Contributor

what version are you on? Is hive.tez.dynamic.semijoin.reduction available/enabled?

Small files are a transient issue - compaction will merge them into fewer.

A side note: hive.merge.cardinality.check=false is probably a bad idea. This should make very little difference for perf but could lead to data corruption if you the condition it checks for is violated (i.e. if more than 1 row from sources matches the same row on target).

Re: Are there optimizations for ACID Merge with 100+ million rows?

New Contributor

HDP 2.6.1 is the version. Hive is telling me "hive.tez.dynamic.semijoin.reduction is undefined" and then "Query returned non-zero code: 1, cause: hive configuration hive.tez.dynamic.semijoin.reduction does not exists." if I try to set it to true.

Thanks for the response Eugene! I was more or less experimenting with the cardinality check, but I will remove it to see if it alone affects performance. I did read that info when I enabled it, but it had said that it could impact performance so I tried it out. Considering the file size, would you say there is a max bucket size to watch for? It seems consensus is that there is no tried and true for bucket size.

Highlighted

Re: Are there optimizations for ACID Merge with 100+ million rows?

Expert Contributor

When choosing number of buckets I would think about your "largest" write since the parallelism of the write is limited to the bucket count. Note that update/delete (and merge) are writes.

From read side I would optimize bucket count for reading a fully (major) compacted table though since Acid tables require system defined sort order they do not support SMB joins and I'm not sure what else can benefit from bucketing.

Since the writes vary greatly, I would not worry much about file sizes (meaning I don't think there is a good way to get this right) in delta directories and make sure that compaction runs frequently enough to mitigate this.

hive.tez.dynamic.semijoin.reduction should be available in Hive2 in 2.6.1. The implementation of Merge uses a Right Outer Join (if you have Insert clause) and SJ reduction is designed to help this when the inner side (the target of merge) is large.

Don't have an account?
Coming from Hortonworks? Activate your account here