Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

How to use Pig to replace records from a relation with records from another relation based on some condition

avatar
Expert Contributor

I have two files on my hdfs. One of the file(latest file) contains some updates on the other file(previous file). Now, I want to check if value of specific columns on the latest file also exist on the previous file(or if they have same value), and replace such records of the previous file with records of the latest file.(i.e. delete such records from the previous file and replace with records from latest file).

That means, I need to check each record of the previous file against each record of the latest file based on specific columns. If matching is found, delete the whole record from the previous file , then replace with the record from latest file.

How can I achieve this with Pig?

thanks!

1 ACCEPTED SOLUTION

avatar

Great question. My solution below is trimmed out of the presentation described at https://martin.atlassian.net/wiki/x/GYBzAg on a much bigger topic.

So, let's assume you have an this original file that has an ID, a date created, and three "payload" attributes.

[root@sandbox ~]# hdfs dfs -cat origs.csv
11,2014-09-17,base,base,base
12,2014-09-17,base,base,base
13,2014-09-17,base,base,base
14,2014-09-18,base,base,base
15,2014-09-18,base,base,base
16,2014-09-18,base,base,base
17,2014-09-19,base,base,base
18,2014-09-19,base,base,base
19,2014-09-19,base,base,base

Now, let's assume you have a delta file that has 4 new records (ID's 10, 20, 21 and 22) as well as more recent versions of 3 other records (IDs 12, 14, and 16).

[root@sandbox ~]# hdfs dfs -cat delta.csv
10,2014-09-16,oops,was,missed
20,2014-09-20,base,base,base
21,2014-09-20,base,base,base
22,2014-09-20,base,base,base
12,2014-09-17,base,CHANGED,base
14,2014-09-18,base,CHANGED,base
16,2014-09-18,base,CHANGED,base 

Then, in a Pig script you could join these like this.

origs = LOAD '/user/maria_dev/hcc/86778/original.csv'
          USING PigStorage(',') AS
            ( bogus_id:int, date_cr: chararray,
              field1:chararray, field2:chararray, field3:chararray );

delta = LOAD '/user/maria_dev/hcc/86778/delta.csv'
          USING PigStorage(',') AS
            ( bogus_id:int, date_cr: chararray,
              field1:chararray, field2:chararray, field3:chararray );

joined = JOIN origs BY bogus_id FULL OUTER, delta BY bogus_id;

DESCRIBE joined;
DUMP joined; 

And get this output.

joined: {origs::bogus_id: int,origs::date_cr: chararray,origs::field1: chararray,origs::field2: chararray,origs::field3: chararray,delta::bogus_id: int,delta::date_cr: chararray,delta::field1: chararray,delta::field2: chararray,delta::field3: chararray}
(,,,,,10,2014-09-16,oops,was,missed)
(11,2014-09-17,base,base,base,,,,,)
(12,2014-09-17,base,base,base,12,2014-09-17,base,CHANGED,base)
(13,2014-09-17,base,base,base,,,,,)
(14,2014-09-18,base,base,base,14,2014-09-18,base,CHANGED,base)
(15,2014-09-18,base,base,base,,,,,)
(16,2014-09-18,base,base,base,16,2014-09-18,base,CHANGED,base)
(17,2014-09-19,base,base,base,,,,,)
(18,2014-09-19,base,base,base,,,,,)
(19,2014-09-19,base,base,base,,,,,)
(,,,,,20,2014-09-20,base,base,base)
(,,,,,21,2014-09-20,base,base,base)
(,,,,,22,2014-09-20,base,base,base) 

You'll see above that if the delta record's fields are present (the ones on the right side) then they should be the ones carried forward as they are either new (4) or modified (3) records, but if they are missing (6) then the original values should just roll forward.

merged = FOREACH joined GENERATE
  ((delta::bogus_id is not null) ? delta::bogus_id: origs::bogus_id) as bogus_id,
  ((delta::date_cr  is not null) ? delta::date_cr:  origs::date_cr)  as date_cr,
  ((delta::field1   is not null) ? delta::field1:   origs::field1)   as field1,
  ((delta::field2   is not null) ? delta::field2:   origs::field2)   as field2,
  ((delta::field3   is not null) ? delta::field3:   origs::field3)   as field3;

DESCRIBE merged;
DUMP merged; 

As you can see from the combined output, we have the necessary 13 rows in the new dataset.

merged: {bogus_id: int,date_cr: chararray,field1: chararray,field2: chararray,field3: chararray}
(10,2014-09-16,oops,was,missed)
(11,2014-09-17,base,base,base)
(12,2014-09-17,base,CHANGED,base)
(13,2014-09-17,base,base,base)
(14,2014-09-18,base,CHANGED,base)
(15,2014-09-18,base,base,base)
(16,2014-09-18,base,CHANGED,base)
(17,2014-09-19,base,base,base)
(18,2014-09-19,base,base,base)
(19,2014-09-19,base,base,base)
(20,2014-09-20,base,base,base)
(21,2014-09-20,base,base,base)
(22,2014-09-20,base,base,base) 

Good luck and happy Hadooping!

View solution in original post

4 REPLIES 4

avatar

Great question. My solution below is trimmed out of the presentation described at https://martin.atlassian.net/wiki/x/GYBzAg on a much bigger topic.

So, let's assume you have an this original file that has an ID, a date created, and three "payload" attributes.

[root@sandbox ~]# hdfs dfs -cat origs.csv
11,2014-09-17,base,base,base
12,2014-09-17,base,base,base
13,2014-09-17,base,base,base
14,2014-09-18,base,base,base
15,2014-09-18,base,base,base
16,2014-09-18,base,base,base
17,2014-09-19,base,base,base
18,2014-09-19,base,base,base
19,2014-09-19,base,base,base

Now, let's assume you have a delta file that has 4 new records (ID's 10, 20, 21 and 22) as well as more recent versions of 3 other records (IDs 12, 14, and 16).

[root@sandbox ~]# hdfs dfs -cat delta.csv
10,2014-09-16,oops,was,missed
20,2014-09-20,base,base,base
21,2014-09-20,base,base,base
22,2014-09-20,base,base,base
12,2014-09-17,base,CHANGED,base
14,2014-09-18,base,CHANGED,base
16,2014-09-18,base,CHANGED,base 

Then, in a Pig script you could join these like this.

origs = LOAD '/user/maria_dev/hcc/86778/original.csv'
          USING PigStorage(',') AS
            ( bogus_id:int, date_cr: chararray,
              field1:chararray, field2:chararray, field3:chararray );

delta = LOAD '/user/maria_dev/hcc/86778/delta.csv'
          USING PigStorage(',') AS
            ( bogus_id:int, date_cr: chararray,
              field1:chararray, field2:chararray, field3:chararray );

joined = JOIN origs BY bogus_id FULL OUTER, delta BY bogus_id;

DESCRIBE joined;
DUMP joined; 

And get this output.

joined: {origs::bogus_id: int,origs::date_cr: chararray,origs::field1: chararray,origs::field2: chararray,origs::field3: chararray,delta::bogus_id: int,delta::date_cr: chararray,delta::field1: chararray,delta::field2: chararray,delta::field3: chararray}
(,,,,,10,2014-09-16,oops,was,missed)
(11,2014-09-17,base,base,base,,,,,)
(12,2014-09-17,base,base,base,12,2014-09-17,base,CHANGED,base)
(13,2014-09-17,base,base,base,,,,,)
(14,2014-09-18,base,base,base,14,2014-09-18,base,CHANGED,base)
(15,2014-09-18,base,base,base,,,,,)
(16,2014-09-18,base,base,base,16,2014-09-18,base,CHANGED,base)
(17,2014-09-19,base,base,base,,,,,)
(18,2014-09-19,base,base,base,,,,,)
(19,2014-09-19,base,base,base,,,,,)
(,,,,,20,2014-09-20,base,base,base)
(,,,,,21,2014-09-20,base,base,base)
(,,,,,22,2014-09-20,base,base,base) 

You'll see above that if the delta record's fields are present (the ones on the right side) then they should be the ones carried forward as they are either new (4) or modified (3) records, but if they are missing (6) then the original values should just roll forward.

merged = FOREACH joined GENERATE
  ((delta::bogus_id is not null) ? delta::bogus_id: origs::bogus_id) as bogus_id,
  ((delta::date_cr  is not null) ? delta::date_cr:  origs::date_cr)  as date_cr,
  ((delta::field1   is not null) ? delta::field1:   origs::field1)   as field1,
  ((delta::field2   is not null) ? delta::field2:   origs::field2)   as field2,
  ((delta::field3   is not null) ? delta::field3:   origs::field3)   as field3;

DESCRIBE merged;
DUMP merged; 

As you can see from the combined output, we have the necessary 13 rows in the new dataset.

merged: {bogus_id: int,date_cr: chararray,field1: chararray,field2: chararray,field3: chararray}
(10,2014-09-16,oops,was,missed)
(11,2014-09-17,base,base,base)
(12,2014-09-17,base,CHANGED,base)
(13,2014-09-17,base,base,base)
(14,2014-09-18,base,CHANGED,base)
(15,2014-09-18,base,base,base)
(16,2014-09-18,base,CHANGED,base)
(17,2014-09-19,base,base,base)
(18,2014-09-19,base,base,base)
(19,2014-09-19,base,base,base)
(20,2014-09-20,base,base,base)
(21,2014-09-20,base,base,base)
(22,2014-09-20,base,base,base) 

Good luck and happy Hadooping!

avatar
Expert Contributor

Thank you very much @Lester Martin! This is exactly what I was looking for. If you don't mind I have another related question. This logic is done on more than 36 different files. In database concept, one of the files uses the ID and CreateDate fields as Primary Key and these fields are used as Foreign Keys in the rest of the files. * The files are dropped daily into Hadoop local directory * The files have current date appended to their file names

So, I need to read all the files from Hadoop local directory, do the above logic on each of them, then store the results into HDFS. Is Pig the optimal (or feasible at all) solution for my use case. Currently, I am doing this logic using C# program to read the files, do the logic and insert into relational database. Why I am seeking for Pig is to improve the performance of the ETL process.

Any recommendation on this please?

Thanks!

avatar

Pig is definitely and option, but a couple points. If you only do this once a month and have all the daily files (say 1st - 31st of the month) then understand that Pig doesn't do simple control loop logic (as identified in the presentation that my answer above points you to) and you'd have to wrap it with some controlling script or something. But... on the other hand... if you get a new daily file each day then Pig is going to be your best friend since the previous day's finalized file is now the new "origs" file from above and you just do the delta processing one file at a time.

I'm sure there's more to it than I'm imagining, but that general pattern I quickly described is HIGHLY leveraged by many Pig users out there. Good luck & thanks for accepting my answer!

avatar

Hey @Kibrom Gebrehiwot, just wondering if my answer below was able to help you out and if so, I'd sure appreciate it you marked it "Best Answer" by clicking the "Accept" link at the bottom of it. 😉

If it isn't helpful, please add a comment to the answer and let me know what concerns you may still have. Thanks!