Created 03-21-2016 02:13 AM
Problem Statement: I have a huge history data set in HDFS on top of which i want to remove duplicates to begin with and also the daily ingested data have to be compared with the history to remove duplicates plus the daily data may have duplicates within itself as well.
Duplicates could mean
Question:
I see vague solutions around but its not very well documented and hard to understand. I have already looked at link, but its not clear. Code samples would help.
Created 03-21-2016 09:31 PM
TL DR; Use Hive on Tez... Tez can do Dynamic Partition Pruning witch lets you join partitions without putting them in your WHERE clause. IT DOES NOT WORK WITH MAPREDUCE ONLY TEZ!!!
#Longer Version
So this has been done before, some people look to HBase for this but the prospect of needing dedicated infrastructure for deduplication is painful at scale and may not allow for a wide enough temporal window to be stored to dedupe against. For truly big data applications where you are potentially storing into the triple digit petabytes you will want to keep it inside YARN not out of but you already assumed that much.
In the applications I have worked with we have a HASHID in the record already which is ready to be ingested into a master table that we can join against. We also have partitions.. The Partitions are very important, as they let you reduce the subset of data you are going to check against we have to do our due diligence to avoid hitting the entire dataset just to be able to ingest deduped records. Using Tez we have something called Dynamic Partition Pruning which lets us drop partitions which are not doing to be used in the target 'master' table. So using a temp table we can ingest data from a staging table deduping it against the partition it resides in and ingesting it should it not exist.
PS - Left outerjoin and test for null in the WHERE is probably better for scaling then UNION DISTINCT if you are worried about a reducer problem. Same join syntax as the example below...
PS 2- We have found a fun case where if you try to use this to dedupe or clean existing master data (like a full table cleanup, no ingestion), and that's if a partition has a single record which is invalid according to new filtering logic that it will not be removed unless new records are being inserted into that partition. This is actually expected behavior because a partition is not overwritten unless its inserted into, so by filtering out the only row which would have made its way back into the table we effectively do nothing to that partition. But again, if you insert anything new which isnt filtered out, or have a good record in there then it will work.
Example -
####
####
create table stage_new
(
hash string,
line string,
p1 string,
p2 string,
p3 string
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ‘,’
STORED AS TEXTFILE;
/apps/hive/warehouse/stage_new
create table master_base
(
hash string,
line string
)
PARTITIONED BY
(
p1 string,
p2 string,
p3 string
);
set hive.exec.dynamic.partition.mode=nonstrict;
INSERT OVERWRITE TABLE master_base PARTITION(p1, p2, p3)
SELECT * FROM stage_new;
##test data (initaly loaded to prepopulat emaster)
12,THEDUPE,1,1,1
11,test2,1,1,2
10,test3,2,1,1
9,test4,2,1,1
##data with dupe (put into the stage_new table after cleaning it out from the inital test data and loading it)
12,THEDUPE,1,1,1
8,test1,1,1,1
7,test2,1,1,2
6,test3,3,1,1
##Populate
set hive.exec.dynamic.partition.mode=nonstrict;
create temporary table new_parts
AS select p1, p2, p3 from stage_new group by p1, p2, p3;
##DEDUPE and INSERT
INSERT OVERWRITE TABLE master_base PARTITION(p1, p2, p3)
select hash, line, p1, p2, p3 from stage_new
UNION DISTINCT
select hash, line, mb.p1, mb.p2, mb.p3 from master_base mb JOIN new_parts np ON (mb.p1 = np.p1 AND mb.p2 = np.p2 AND mb.p3 = np.p3);
##master_base BEFORE Insert of dedupe required data##
hive> select * from master_base;
OK
12 THEDUPE 1 1 1
11 test2 1 1 2
10 test3 2 1 1
9 test4 2 1 1
##master_base AFTER Insert##
hive> select * from master_base;
OK
12 THEDUPE 1 1 1
8 test1 1 1 1
11 test2 1 1 2
7 test2 1 1 2
10 test3 2 1 1
9 test4 2 1 1
6 test3 3 1 1
Created 03-25-2016 08:01 PM
@Joseph Niemiec You mentioned "Left outerjoin and test for null in the WHERE is probably better for scaling then UNION DISTINCT if you are worried about a reducer problem. Same join syntax as the example below..."
How left outer join avoids reducer (unless its a map join)? Do you recommend left outer join than union distinct?
And in the point "We have found a fun case where if you try to use this to dedupe or clean.....", so my understanding is that if a partition has 5 records which are duplicates (the initial master load already had it), there is no way to remove unless a 6th records which is a duplicate of those 5 records come in the staging load. Am i right? If so, what is your recommendation to remove duplicates in the initial load itself?
Created 03-22-2016 03:00 PM
@Joseph Niemiec Looks like this approach puts a restriction that the columns needed to be compared for duplication have to be the partition columns. Not all columns may qualify to be partition columns. Even if i find the hash of those columns, partition on that hash column may not qualify due to high cardinality. Any other option apart from dynamic partition pruning?
Created 03-22-2016 03:45 PM
Partitions are important just to reduce the dataset size that your joining on, if your table is triple petabytes and you have to scan it each time to ingest data thats not a very smart design. That said columns do not have to be partitions!! We have a column which is the hash, we could have any number of extra columns here as well! We dont partition on the hash column at all, if your usecase can handle scanning the entire dataset each time to remove dupes then dont worry about them.
One way or another you need a way to test for unquieness of the record... A left outter join does this easily with a test for null to see which records are in the 'ingest' dataset which are not in the master dataset...
insert into table master_table
select id, col1, col2, colN
FROM (
select s.id, col1, col2, colN, m.id
from Staging_table s LEFT JOIN Master_table m on (s.id == m.id)
WHERE m.id is null;
)