Support Questions
Find answers, ask questions, and share your expertise

SQL table Tracking the latest record and update

SQL table Tracking the latest record and update

I have a table id_track history, which is updating id in different time-stamp. I want to consolidate into latest id by iterative search in sql. How can I do it in hive or pig?

Table: 
OLD_ID  NEW_ID  TIME-STAMP
101 103 1/5/2001
102 108 2/5/2001
103 105 3/5/2001
105 106 4/5/2001
110 111 4/5/2001
108 116 14/5/2001
112 117 4/6/2001
104 118 4/7/2001
111 119 4/8/2001


Desired Resulting table: 
OLD_ID  LATEST_ID   LAST TIME-STAMP
101 106 4/5/2001
102 116 14/5/2001
104 118 4/7/2001
110 111 4/5/2001
112 117 4/6/2001
111 119 4/8/2001





3 REPLIES 3

Re: SQL table Tracking the latest record and update

@Ashish Vishnoi

This could be achieved in two ways either using CTE or using custom UDF.

Re: SQL table Tracking the latest record and update

@Ashish Vishnoi

I would personally do it in pig.

Reasons:

1. Code Maintainability

2. Performance

Below are the steps

r1 = old id, ts

r2 = new id,ts

r1_sorted = min(old id) id1, ts (grouping by ts)

r2_sorted = max(new id) id2, ts (grouping by ts)

result = r1_sorted.id1, r2_sorted.id2, ts (by joining r1_sorted and r2_sorted on ts)

Let me know if you face any problems in coding it in pig.

Thanks

Re: SQL table Tracking the latest record and update

I found the correct answer like make dataframe in pyspark left join with itself and filter nulls and not nulls, put everything into a loop until df.take(1)=[] null.

---------------------------------------------------------------------------

df = df_tr.select(col("old_id"),col("new__id")).distinct()

df2 = df

df_tr = spark.createDataFrame([('null', 'null')],

while (df.take(1) != []):

df = df.alias("df1").join(df2.alias("df2"), col('df1.new__id') == col('df2.old_id'), 'left_outer')

df_null = df.filter(col('df2.new_id').isNull()).select(col('df1.old_id'), col('df1.new_id'))

df = df.filter(col('df2.new_id').isNotNull()).select(col('df1.old_id'), col('df2.new_id'))

df_tr = df_null.union(df_tracking)