Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Update with sysdate -1 records and Deletes

avatar

Hi Team,

I have two questions, can anyone help me

1) How will I get the sysdate -1 updated records using NiFi(Ex: I have a timestamp 09:00 today but when I get incremental the processor need to check since 09:01 yesterday or else less than 2 hours of timestamp)?

2) When delete happens in the source how will affect in target? and I need to store delete records in another table in the target how will I achieve using NiFi?

Thank you.

1 ACCEPTED SOLUTION

avatar
Master Guru

For #1 you can use QueryDatabaseTable or GenerateTableFetch -> ExecuteSQL, you can set the Maximum Value Column property to your timestamp property. If you schedule the processor to run once a day, it will get all records added since the maximum timestamp observed the last time the processor ran. It doesn't have fidelity based on the timestamp value itself, instead it keeps track of the maximum value it's seen so far, then adds a WHERE clause to the SQL statement to get all rows with a timestamp greater than its maximum observed value so far. For those rows, it keeps track of the new current maximum value, and so on.

For #2 you'd need a way to know a row was deleted in the source. If you can intercept when the DELETE statement is issued to the target, you could at that time issue a DELETE to the target. Alternatively if your database has Change Data Capture (CDC) support, you may be able to query the delta tables or something. For MySQL we have the CaptureChangeMySQL processor, which reads the binary logs and sends each event downstream in your flow. In that case you'd get an event for the delete, which you can change to a SQL delete statement for PutSQL, or (better to) use PutDatabaseRecord using the "statement.type" attribute, which you would set to the value of the "cdc.event.type" attribute via an UpdateAttribute processor. The "cdc.event.type" attribute is set by the CaptureChangeMySQL processor.

View solution in original post

4 REPLIES 4

avatar
Master Guru

For #1 you can use QueryDatabaseTable or GenerateTableFetch -> ExecuteSQL, you can set the Maximum Value Column property to your timestamp property. If you schedule the processor to run once a day, it will get all records added since the maximum timestamp observed the last time the processor ran. It doesn't have fidelity based on the timestamp value itself, instead it keeps track of the maximum value it's seen so far, then adds a WHERE clause to the SQL statement to get all rows with a timestamp greater than its maximum observed value so far. For those rows, it keeps track of the new current maximum value, and so on.

For #2 you'd need a way to know a row was deleted in the source. If you can intercept when the DELETE statement is issued to the target, you could at that time issue a DELETE to the target. Alternatively if your database has Change Data Capture (CDC) support, you may be able to query the delta tables or something. For MySQL we have the CaptureChangeMySQL processor, which reads the binary logs and sends each event downstream in your flow. In that case you'd get an event for the delete, which you can change to a SQL delete statement for PutSQL, or (better to) use PutDatabaseRecord using the "statement.type" attribute, which you would set to the value of the "cdc.event.type" attribute via an UpdateAttribute processor. The "cdc.event.type" attribute is set by the CaptureChangeMySQL processor.

avatar

Thanks Matt for the quick response, And one more question I'm using queryDatabaseTable and maximum-column ID, if my state 100 and new record added in the source with 50 number I'm not getting this record because it will always check the greater than records, How will I get the less than records as well?

avatar
Master Guru

If your ID values can come in out of order, then it's not a good choice for a Maximum Value Column. Usually timestamps or always-increasing values are used as Maximum Value Columns. If you don't have a column of this type, how would you know that the row was new or not? In the worst case you could keep a duplicate copy of the table as GenerateTableFetch/QueryDatabaseTable knows about it, then do a JOIN against the current table to find the new rows, but that is very resource-intensive and does not scale well at all.

avatar
@Matt Burgess

Thank you.