Community Articles
Find and share helpful community-sourced technical articles
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.
Labels (1)
Super Guru

NiFi is most effectively used as an "always-on" system, meaning that the data flows are often always operational (running). Doing batch processing is a more difficult task and usually requires some user intervention (such as stopping a source processor).

For relational databases (RDBMS), a common use case is to migrate, replicate, or otherwise move the data from the source RDBMS to some target. If ExecuteSQL were used to get all data from a table (for example), then the processor will execute the query each time it is triggered to run, and will return whatever results correspond to the query.

If the goal is to simply move all rows to some target, then ExecuteSQL could be started then immediately stopped, such that it would only execute once, and all results will be sent down the flow.

However a more common use case is that the source database is being updated from some external process (user, webapp, ERP/CRM/etc. system). In order to get the new rows, the table needs to be queried again. However assuming the "old" rows had already been moved, then many duplicate rows would continue to be processed in the flow.

As an alternative the QueryDatabaseTable processor allows you to specify column(s) in a table that are increasing in value (such as an "ID" or "timestamp" column), and the processor will only retrieve rows from the table whose values in those columns are greater than the maximum value(s) observed so far.

To illustrate, consider the following database table called "users":

id name email age
1 Joe joe@fakemail.com 42
2 Mary mary@fakemail.com 24
3 Matt matt@fakemail.com 38

Here, QueryDatabaseTable would be configured to use a table name of "users" and a "Maximum-Value Column" of "id". When the processor runs the first time, it will not have seen any values from the "id" column and thus all rows will be returned. The query executed is:

SELECT * FROM users

However after that query has completed, QueryDatabaseTable stores the maximum value for the "id" column that it has seen; namely, 3.

Now let's say QueryDatabaseTable has been scheduled to run every 5 minutes, and the next time it runs, the table looks like this:

id name email age
1 Joe joe@fakemail.com 42
2 Mary mary@fakemail.com 24
3 Matt matt@fakemail.com 38
4 Armando armando@fakemail.com 20
5 Jim jeff@fakemail.com 30

Because QueryDatabaseTable had stored the maximum value of 3 for the "id" column, this time when the processor runs it executes the following query:

SELECT * FROM users WHERE id > 3

Now you will see just the last two rows (the recently added ones) are returned, and QueryDatabaseTable has stored the new maximum value of 5.

Here is the concept applied to a NiFi flow:

6712-querydatabasetable-flow.png

For this example, I have a "users" table containing many attributes about randomly-generated users, and there are 100 records to start with:

247073_1_edited.jpg

If we run QueryDatabaseTable, we see that after the SplitAvro we get 100 separate flow files:

6714-querydatabasetable-100-flowfiles.png

The flow remains this way until new data comes in (with an "id" value larger than 100). Adding one:

247073_2_edited.jpg

Once this row is added, you can see the additional record move through the flow:

6716-querydatabasetable-101-flowfiles.png

This approach works for any column that has increasing values, such as timestamps. If you want to clear out the maximum value(s) from the processor's state, right click on the processor and choose View State. Then you can click "Clear State" on the dialog:

6717-querydatabasetable-clear-state.png

Hopefully this article has shown how to use QueryDatabaseTable to do incremental fetching of database tables in NiFi. Please let me know how/if this works for you. Cheers!

33,331 Views
Comments
Explorer

very good information, thanks for sharing Matt. Can't wait to see your next one for GenerateTableFetch:)

Not applicable

This is really helpful and tried it in Nifi 1.1.1 version., But it is not showing any value in view state page. Is it because the implementation changed in version 1.1.1.

But i tried with GenerateTableFetch processor, it is showing the value.

I configure the table value as below and 'Maximum-value-columns : nid'

(SELECT nid,node_type_id FROM cnbc_publish.node where type IN ('blogpost', 'cnbcnewsstory', 'partnerstory', 'wirestory', 'pressrelease', 'sponsored') and first_pub_date between 1476489600 and 1479744000 ) nd

Not applicable

Can you please provide the template file for the example above

New Contributor

Hi Matt,

The above article really helps. But I do have a requirement to filter the data, I am searching for an option to add where clause.

Regards

Bhanu Chidigam

Really helpful! But i have a question:

How to specify a value to "Maximum-Value Column"? For your example , if i only want to get the data with an "id" value larger than 50, how to do that?

Super Guru

In QueryDatabaseTable, you'd set the Maximum-Value Column to "id" and add a dynamic property named "initial.maxvalue.id" to 50. Make sure state has been cleared before running, and the first time it executes, it will grab all rows with id > 50. This same capability for GenerateTableFetch is not yet available (NIFI-4283) but coming soon.

Thumb up!

Contributor

@Matt Burgess Can we do join query here from differen tables. I looked at the properties for QueryDatabase table, there is no property for entering query? Any help when query is join query from 2 different tables

Super Guru

This feature was added in NiFi 1.4.0 (NIFI-4257)

Thanks Matt , one more good article from you.

If my nifi went down or processor stopped somehow. After restart what will be the value of the columns present ? (reset or will it keep the updated value)

also eagerly waiting for this processor to handle the actual cdc data from database change log file.that will solve many use cases i hope and may reduce the cost of other CDC based tools.

New Contributor

Hi, is there any way to use this processor to check for a change in a specific record that may not necessarily be increasing? Basically I am looking to monitor a table which only has one row which is a summary showing the number of active loggers which is periodically updated. I want to trigger another processor when this value either increases or decreases. Would it be possible to use the component state via expression language in an sql query to trigger a flow file. I.e select * from table when row data <> component state?

@Matt Burgess

Really helpful!! But, I have a question By using this flow How we will get the updated records?

You may use ExecuteSQL processor for that. You can write custom SQL query with that processor.

Thanks, Matt. Very informative for beginners like me.

Hi @mattburgess i am using the same processor for fetching incremental data from relational tables.i have given max rows fetch size as 500 and max value column as a timestamp. Is fetching data in batch can lead to data loss, as i have seen few records of some timestamp are not being fetched when doing incremental run but are fetched when i clear state and run full load? want to understand working of max rows feature. read your comment regarding max fragment setting on this blog https://community.hortonworks.com/questions/178505/querydatabasetable-processor-shutting-down.html , is the same applicable for max row fetch size too?Please suggest

Visitor

Hi Matt,

 

Is there a suggested way to perform incremental fetching when the columns increase only weakly? We sometimes have multiple updates occurring with the same timestamp, and no secondary column is guaranteed to strictly (or even weakly) increase.

 

This doesn't seem like a flaw in the incremental logic so much as a hidden gotcha. @srijitachaturve mentions an instance of this problem in the previous comment.

Explorer

@mburgess ,

Could you please suggest how I can achieve the same with Cassandra. I tried to use QueryDatabaseTable processor but not able to find the suitable jar and driver class name. So I giveup and tried to use QueryCassandra but it has no option or mechanism like Maximum-row columns and no View State option. So not finding any option to find an incremental changes for a cassandra table.

Please suggest what should be the best approach which will help me to proceed further.

New Member

@mburgess Helpful information shared.

I am using Nifi 1.7.1

For my case, the incremental fetching does not seem to work correctly.

All records gets ingested from the database but do not make it all the way to the destination.

Processor used is GenerateTableFetch then Execute SQL and the other corresponding processors down the data processing flow.

Record id is captured correctly on the GenerateTableFetch property state and its up to date as the record id from the  source (db).

However, it will still miss some records when processing the files making the number of records on the destination out of sync with the source from the db.

Am i missing something, Would scheduling times for fetching help and how can I do that?

Don't have an account?
Coming from Hortonworks? Activate your account here
Version history
Revision #:
4 of 4
Last update:
‎02-12-2020 06:33 AM
Updated by:
 
Top Kudoed Authors