Created 03-03-2017 03:43 PM
I have two files on my hdfs. One of the file(latest file) contains some updates on the other file(previous file). Now, I want to check if value of specific columns on the latest file also exist on the previous file(or if they have same value), and replace such records of the previous file with records of the latest file.(i.e. delete such records from the previous file and replace with records from latest file).
That means, I need to check each record of the previous file against each record of the latest file based on specific columns. If matching is found, delete the whole record from the previous file , then replace with the record from latest file.
How can I achieve this with Pig?
thanks!
Created 03-04-2017 05:04 PM
Great question. My solution below is trimmed out of the presentation described at https://martin.atlassian.net/wiki/x/GYBzAg on a much bigger topic.
So, let's assume you have an this original file that has an ID, a date created, and three "payload" attributes.
[root@sandbox ~]# hdfs dfs -cat origs.csv 11,2014-09-17,base,base,base 12,2014-09-17,base,base,base 13,2014-09-17,base,base,base 14,2014-09-18,base,base,base 15,2014-09-18,base,base,base 16,2014-09-18,base,base,base 17,2014-09-19,base,base,base 18,2014-09-19,base,base,base 19,2014-09-19,base,base,base
Now, let's assume you have a delta file that has 4 new records (ID's 10, 20, 21 and 22) as well as more recent versions of 3 other records (IDs 12, 14, and 16).
[root@sandbox ~]# hdfs dfs -cat delta.csv 10,2014-09-16,oops,was,missed 20,2014-09-20,base,base,base 21,2014-09-20,base,base,base 22,2014-09-20,base,base,base 12,2014-09-17,base,CHANGED,base 14,2014-09-18,base,CHANGED,base 16,2014-09-18,base,CHANGED,base
Then, in a Pig script you could join these like this.
origs = LOAD '/user/maria_dev/hcc/86778/original.csv' USING PigStorage(',') AS ( bogus_id:int, date_cr: chararray, field1:chararray, field2:chararray, field3:chararray ); delta = LOAD '/user/maria_dev/hcc/86778/delta.csv' USING PigStorage(',') AS ( bogus_id:int, date_cr: chararray, field1:chararray, field2:chararray, field3:chararray ); joined = JOIN origs BY bogus_id FULL OUTER, delta BY bogus_id; DESCRIBE joined; DUMP joined;
And get this output.
joined: {origs::bogus_id: int,origs::date_cr: chararray,origs::field1: chararray,origs::field2: chararray,origs::field3: chararray,delta::bogus_id: int,delta::date_cr: chararray,delta::field1: chararray,delta::field2: chararray,delta::field3: chararray} (,,,,,10,2014-09-16,oops,was,missed) (11,2014-09-17,base,base,base,,,,,) (12,2014-09-17,base,base,base,12,2014-09-17,base,CHANGED,base) (13,2014-09-17,base,base,base,,,,,) (14,2014-09-18,base,base,base,14,2014-09-18,base,CHANGED,base) (15,2014-09-18,base,base,base,,,,,) (16,2014-09-18,base,base,base,16,2014-09-18,base,CHANGED,base) (17,2014-09-19,base,base,base,,,,,) (18,2014-09-19,base,base,base,,,,,) (19,2014-09-19,base,base,base,,,,,) (,,,,,20,2014-09-20,base,base,base) (,,,,,21,2014-09-20,base,base,base) (,,,,,22,2014-09-20,base,base,base)
You'll see above that if the delta record's fields are present (the ones on the right side) then they should be the ones carried forward as they are either new (4) or modified (3) records, but if they are missing (6) then the original values should just roll forward.
merged = FOREACH joined GENERATE ((delta::bogus_id is not null) ? delta::bogus_id: origs::bogus_id) as bogus_id, ((delta::date_cr is not null) ? delta::date_cr: origs::date_cr) as date_cr, ((delta::field1 is not null) ? delta::field1: origs::field1) as field1, ((delta::field2 is not null) ? delta::field2: origs::field2) as field2, ((delta::field3 is not null) ? delta::field3: origs::field3) as field3; DESCRIBE merged; DUMP merged;
As you can see from the combined output, we have the necessary 13 rows in the new dataset.
merged: {bogus_id: int,date_cr: chararray,field1: chararray,field2: chararray,field3: chararray} (10,2014-09-16,oops,was,missed) (11,2014-09-17,base,base,base) (12,2014-09-17,base,CHANGED,base) (13,2014-09-17,base,base,base) (14,2014-09-18,base,CHANGED,base) (15,2014-09-18,base,base,base) (16,2014-09-18,base,CHANGED,base) (17,2014-09-19,base,base,base) (18,2014-09-19,base,base,base) (19,2014-09-19,base,base,base) (20,2014-09-20,base,base,base) (21,2014-09-20,base,base,base) (22,2014-09-20,base,base,base)
Good luck and happy Hadooping!
Created 03-04-2017 05:04 PM
Great question. My solution below is trimmed out of the presentation described at https://martin.atlassian.net/wiki/x/GYBzAg on a much bigger topic.
So, let's assume you have an this original file that has an ID, a date created, and three "payload" attributes.
[root@sandbox ~]# hdfs dfs -cat origs.csv 11,2014-09-17,base,base,base 12,2014-09-17,base,base,base 13,2014-09-17,base,base,base 14,2014-09-18,base,base,base 15,2014-09-18,base,base,base 16,2014-09-18,base,base,base 17,2014-09-19,base,base,base 18,2014-09-19,base,base,base 19,2014-09-19,base,base,base
Now, let's assume you have a delta file that has 4 new records (ID's 10, 20, 21 and 22) as well as more recent versions of 3 other records (IDs 12, 14, and 16).
[root@sandbox ~]# hdfs dfs -cat delta.csv 10,2014-09-16,oops,was,missed 20,2014-09-20,base,base,base 21,2014-09-20,base,base,base 22,2014-09-20,base,base,base 12,2014-09-17,base,CHANGED,base 14,2014-09-18,base,CHANGED,base 16,2014-09-18,base,CHANGED,base
Then, in a Pig script you could join these like this.
origs = LOAD '/user/maria_dev/hcc/86778/original.csv' USING PigStorage(',') AS ( bogus_id:int, date_cr: chararray, field1:chararray, field2:chararray, field3:chararray ); delta = LOAD '/user/maria_dev/hcc/86778/delta.csv' USING PigStorage(',') AS ( bogus_id:int, date_cr: chararray, field1:chararray, field2:chararray, field3:chararray ); joined = JOIN origs BY bogus_id FULL OUTER, delta BY bogus_id; DESCRIBE joined; DUMP joined;
And get this output.
joined: {origs::bogus_id: int,origs::date_cr: chararray,origs::field1: chararray,origs::field2: chararray,origs::field3: chararray,delta::bogus_id: int,delta::date_cr: chararray,delta::field1: chararray,delta::field2: chararray,delta::field3: chararray} (,,,,,10,2014-09-16,oops,was,missed) (11,2014-09-17,base,base,base,,,,,) (12,2014-09-17,base,base,base,12,2014-09-17,base,CHANGED,base) (13,2014-09-17,base,base,base,,,,,) (14,2014-09-18,base,base,base,14,2014-09-18,base,CHANGED,base) (15,2014-09-18,base,base,base,,,,,) (16,2014-09-18,base,base,base,16,2014-09-18,base,CHANGED,base) (17,2014-09-19,base,base,base,,,,,) (18,2014-09-19,base,base,base,,,,,) (19,2014-09-19,base,base,base,,,,,) (,,,,,20,2014-09-20,base,base,base) (,,,,,21,2014-09-20,base,base,base) (,,,,,22,2014-09-20,base,base,base)
You'll see above that if the delta record's fields are present (the ones on the right side) then they should be the ones carried forward as they are either new (4) or modified (3) records, but if they are missing (6) then the original values should just roll forward.
merged = FOREACH joined GENERATE ((delta::bogus_id is not null) ? delta::bogus_id: origs::bogus_id) as bogus_id, ((delta::date_cr is not null) ? delta::date_cr: origs::date_cr) as date_cr, ((delta::field1 is not null) ? delta::field1: origs::field1) as field1, ((delta::field2 is not null) ? delta::field2: origs::field2) as field2, ((delta::field3 is not null) ? delta::field3: origs::field3) as field3; DESCRIBE merged; DUMP merged;
As you can see from the combined output, we have the necessary 13 rows in the new dataset.
merged: {bogus_id: int,date_cr: chararray,field1: chararray,field2: chararray,field3: chararray} (10,2014-09-16,oops,was,missed) (11,2014-09-17,base,base,base) (12,2014-09-17,base,CHANGED,base) (13,2014-09-17,base,base,base) (14,2014-09-18,base,CHANGED,base) (15,2014-09-18,base,base,base) (16,2014-09-18,base,CHANGED,base) (17,2014-09-19,base,base,base) (18,2014-09-19,base,base,base) (19,2014-09-19,base,base,base) (20,2014-09-20,base,base,base) (21,2014-09-20,base,base,base) (22,2014-09-20,base,base,base)
Good luck and happy Hadooping!
Created 03-07-2017 06:09 AM
Thank you very much @Lester Martin! This is exactly what I was looking for. If you don't mind I have another related question. This logic is done on more than 36 different files. In database concept, one of the files uses the ID and CreateDate fields as Primary Key and these fields are used as Foreign Keys in the rest of the files. * The files are dropped daily into Hadoop local directory * The files have current date appended to their file names
So, I need to read all the files from Hadoop local directory, do the above logic on each of them, then store the results into HDFS. Is Pig the optimal (or feasible at all) solution for my use case. Currently, I am doing this logic using C# program to read the files, do the logic and insert into relational database. Why I am seeking for Pig is to improve the performance of the ETL process.
Any recommendation on this please?
Thanks!
Created 03-07-2017 06:27 AM
Pig is definitely and option, but a couple points. If you only do this once a month and have all the daily files (say 1st - 31st of the month) then understand that Pig doesn't do simple control loop logic (as identified in the presentation that my answer above points you to) and you'd have to wrap it with some controlling script or something. But... on the other hand... if you get a new daily file each day then Pig is going to be your best friend since the previous day's finalized file is now the new "origs" file from above and you just do the delta processing one file at a time.
I'm sure there's more to it than I'm imagining, but that general pattern I quickly described is HIGHLY leveraged by many Pig users out there. Good luck & thanks for accepting my answer!
Created 03-07-2017 04:53 AM
Hey @Kibrom Gebrehiwot, just wondering if my answer below was able to help you out and if so, I'd sure appreciate it you marked it "Best Answer" by clicking the "Accept" link at the bottom of it. 😉
If it isn't helpful, please add a comment to the answer and let me know what concerns you may still have. Thanks!