Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

How to read delta records from MS SQL using NiFi/HDF

How to read delta records from MS SQL using NiFi/HDF

I have a few tables in MS SQL, these tables gets updated every second and query more or less look like this

SELECT G_ID,UpdateTime,ID,Name,T_NAME FROM TABLE1 AS table1 INNER JOIN TABLE2 AS table2 ON table1.IP=table2.ID WHERE table2.UpdateTime >= ${lastUpdateTime} AND table2.G_ID > ${lastID}

Let assume the select inner join query results 5 records as shown in Capture1 image If the query runs for the first time ${lastUpdateTime} and ${lastG_ID} is set to 0, and it will return below 5 records. After processing the records, the query will store the max (G_ID) i.e. 5 and max (UpdateTime) i.e. 1512010479 in "etl_stat" table

if table add another 5 new records as shown in Capture2 image:

43781-capture2.png

The query will read first the max(G_ID) and max(UpdateTime) from "etl_stat table" and will frame query as follows

SELECT G_ID,UpdateTime,ID,Name,T_NAME FROM TABLE1 AS table1 INNER JOIN TABLE2 AS table2 ON table1.IP=table2.ID  WHERE table2.UpdateTime >= 1512010479 AND table2.G_ID > 5

So that the query returns only 5 delta records as shown in Capture3 image.

43782-capture3.png

So Every time a query run it should first read the max (G_ID) and max (UpdateTime) from etl_stat table and frame the select inner join query as shown above and get the delta changes.

AS IS ARCHITECTURE USING SPARK SQL:

I have implemented the above use case as follows:

1) Spark JDBC reads phoenix table to get the max(G_ID) and max(UpdateTime) from etl_stat table.

2) Spark JDBC frames the select inner join query like this

SELECT G_ID,UpdateTime,ID,Name,T_NAME FROM TABLE1 AS table1 INNER JOIN TABLE2 AS table2 ON table1.IP=table2.ID  WHERE table2.UpdateTime >= 1512010479 AND table2.G_ID > 5 

3) Spark JDBC runs step 2 inner join query, reads the delta messages from MS SQL server process the records and inserts into HBase.

4) After successful insert into HBase, Spark update the etl_stat table with latest G_ID i.e. 10 and UpdateTime i.e. 1512010500.

5) This job has been cron scheduled to run every 1 minute.

TO BE ARCHITECTURE USING NIFI:

I want to move this use case to Nifi, I want to use NiFi to read records from MS SQL DB and send this record to Kafka.

After successfully publish to Kafka, NiFi will save G_ID and UpdateTime in the database. Once message will reach to Kafka, spark streaming will read messages from Kafka and will save to HBase using existing business logic.

On every run Nifi processor should frame select inner join query using max(G_ID) and max(UpdateTime) in order to get delta records and publish to Kafka. I am new to Nifi/HDF.

I need your help and guidance in order to implement this using Nifi/HDF. if you have better solution/architecture for this use case please suggest.

Sorry for such a long post.

43780-capture1.png

5 REPLIES 5
Highlighted

Re: How to read delta records from MS SQL using NiFi/HDF

Hi @Nilesh Pandey

QuerydatabaseTable has native support for delta : https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.4.0/org.apache...

You can use the Maximum-value Columns to define what are the columns to use in order to detect new lines. NiFi manages an internal state to save the last values, and will get row from this state each time.

Can ou look to this processor and see if it helps ?

Highlighted

Re: How to read delta records from MS SQL using NiFi/HDF

@Abdelkrim Hadjidj Thank you for the quick response , I will check at my end and let you know.Just one more thing ..do you see this as better architecture as compared to the AS IS one ?

Highlighted

Re: How to read delta records from MS SQL using NiFi/HDF

Hi @Abdelkrim Hadjidj If possible can you please share how to set QueryDatabaseTable w.r.t to my query and columns, I am not able to see any field where I can enter my select inner join query QueryDatabaseTable processor.

Highlighted

Re: How to read delta records from MS SQL using NiFi/HDF

Hi @Abdelkrim Hadjid I am not able to use QuerydatabaseTable for this use case, as mentioned in my previous comment. Can you please help me on this I am badly stuck on this.

Highlighted

Re: How to read delta records from MS SQL using NiFi/HDF

New Contributor

Hi @Nilesh Pandey from your answer I get that your problem was with a row version column. Were you able to solve the issue? I'm facing exactly the same problem.

Don't have an account?
Coming from Hortonworks? Activate your account here