Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Pig generate a keychange column - comparing previous record with current record but different column.

Highlighted

Pig generate a keychange column - comparing previous record with current record but different column.

Expert Contributor

Hi,

My input data will be in the below format.

   col1 col2 col3 effective date expiry date
1   Q1   A1  Value1 01/01           01/02
2   Q1    A1  Value1 01/02           01/03
3   Q1    A1  Value1 01/03           01/05
4   Q1    A1  Value2 01/05           01/06
5   Q1    A1  Value2 01/06           01/07
6   Q1    A1  Value2 01/07           01/08
7   Q1    A1  Value1 01/08           01/11
8   Q1    A1  Value1 01/11           12/31 

I need to remove duplicates based on values of col1, col2, col3 but not all the duplicates. Until the value of col3 changes to different value the records are considered as duplicates. for eg. in the above data, Value 1 changes to value 2 in 4th record, so among records 1,2 and 3 only 1st should be retained. And among record 4,5 and 6 only 4th should be retained. And among records 7 and 8 only 7 should be retained. The last 2 columns are actually date columns (effective and expiry date).

The duplicates like 1,2 and 3 could occur many times (like 1,2,3,4 and 5 could have same value) or there could be no duplicates at all.

I was having two approaches in mind, but not sure how to code for any of them.

1. So I was thinking of generating a keychange column (1 or 0) that changes the value from 1 to 0 for all the dupes and when the key (combination of col1, col2, col3) changes, the value of this keychange column should be set to 1. Then I could filter on this column. But for this I need to write a UDF (or are there any UDF with similar functionality available?), since this requires input to be in sorted order when passing to udf, is it possible to pass sorted data to udf? if so, how? What kind of UDF should this be? or even if I write a mapreduce code, how should I proceed, should I just emit the records in mapper and do all the sorting and generating the column in reducer? Please let me know your inputs (new to mapreduce programming, so your ideas will help me a lot in learning, thanks!).

2. When I went through the "over" function documentation, it compares only previous record and current record same column, If somehow I could compare the col5 (expiry date) of current record with col4 (effective date) of next record after sorting based on col4 (effective date) in ascending order, I could do a group by on Col1, col2 and Col3 and eliminate those record where effective date was same as previous record's expiry date. But not sure how to compare two different columns using over function.

Please let me know your thoughts on this one too.

Please let me know if there is another better way to solve this. Thank you for your time!

1 REPLY 1

Re: Pig generate a keychange column - comparing previous record with current record but different column.

New Contributor

You can try something similar:

i/p:

Q1,A1,Value1,01/01,01/02

Q1,A1,Value1,01/02,01/03

Q1,A1,Value1,01/03,01/05

Q1,A1,Value2,01/05,01/06

Q1,A1,Value2,01/06,01/07

Q1,A1,Value2,01/07,01/08

Q1,A1,Value3,01/08,01/11

Q1,A1,Value3,01/11,12/31

o/p:

Q1,A1,Value1,01/01,01/02

Q1,A1,Value2,01/05,01/06

Q1,A1,Value3,01/08,01/11

register /usr/hdp/2.4.0.0-169/pig/lib/piggybank.jar; /* register path to piggybank jar */

define Stitch org.apache.pig.piggybank.evaluation.Stitch;

define Over org.apache.pig.piggybank.evaluation.Over('int');

A = load '/pigtest' using PigStorage(',') as (c1:chararray,c2:chararray,c3:chararray,c4:chararray,c5:chararray);

B = group A by (c1,c2,c3);

C = foreach B { order1 = order A by c1,c2,c3,c4; generate flatten(Stitch(order1,Over(order1,'row_number'))); };

D = foreach C generate stitched::c1 as c1, stitched::c2 as c2, stitched::c3 as c3, stitched::c4 as c4, stitched::c5 as c5, $5 as rn;

E = FILTER D BY rn == 1;

F = FOREACH E GENERATE c1,c2,c3,c4,c5;

DUMP F;

Just let me know if it works for you.