- Subscribe to RSS Feed
- Mark Question as New
- Mark Question as Read
- Float this Question for Current User
- Bookmark
- Subscribe
- Mute
- Printer Friendly Page
Fetch records from a database incrementally based on time interval
- Labels:
-
Apache NiFi
Created ‎08-30-2018 03:07 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Fetch records from a database incrementally based on time interval We have this requirement to pull records from a database which has millions of records, the only column we can use is a date column. Is there an option to keep incremental based on a time value. Eg: If the initial state is 2018-08-30 12:00:00.00, in the first run it has to fetch the records between 2018-08-30 12:00:00.00 and 2018-08-30 12:15:00.00. Basically it has to keep adding 15 mins after each run. In the next run it has to fetch records between 2018-08-30 12:15:00.00 and 2018-08-30 12:30:00.00.Is this possible to acheive through QueryDatabaseTable processor.
Created on ‎08-30-2018 11:13 PM - edited ‎08-18-2019 03:06 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
By using ExecuteSql processor you can achieve this use case.
flow:
Update attribute configs:
last_iter_val
${getStateValue("next_iter_val")} //to get the last iteration value
next_iter_val
${getStateValue("next_iter_val"):toDate("yyyy-MM-dd HH:mm:ss"):toNumber() :plus(900000):format("yyyy-MM-dd HH:mm:ss")} //get the last iteration value and add 15 mins to it
ExecuteSql configs:
By using the attribute values we are going to fetch the records incrementally from the table.
i have attached the xml you can upload that xml and change as per your requirements.
In addition by using same kind of logic you can store your state in DistributeCache/HBase/Hive/HDFS and fetch the state and increment with 15 mins then pull the data from the table.
Refer to this for more details regards to another ways of storing the state in NiFi
-
If the Answer helped to resolve your issue, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of issues.
Created on ‎08-30-2018 11:13 PM - edited ‎08-18-2019 03:06 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
By using ExecuteSql processor you can achieve this use case.
flow:
Update attribute configs:
last_iter_val
${getStateValue("next_iter_val")} //to get the last iteration value
next_iter_val
${getStateValue("next_iter_val"):toDate("yyyy-MM-dd HH:mm:ss"):toNumber() :plus(900000):format("yyyy-MM-dd HH:mm:ss")} //get the last iteration value and add 15 mins to it
ExecuteSql configs:
By using the attribute values we are going to fetch the records incrementally from the table.
i have attached the xml you can upload that xml and change as per your requirements.
In addition by using same kind of logic you can store your state in DistributeCache/HBase/Hive/HDFS and fetch the state and increment with 15 mins then pull the data from the table.
Refer to this for more details regards to another ways of storing the state in NiFi
-
If the Answer helped to resolve your issue, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of issues.
Created ‎10-23-2018 04:41 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
@Shu: in incremental case: can we fetch updated rows on next schedule time. like i am adding 3 new rows and update provious row. so can we fetch updated rows also. through timestamp
Created ‎09-07-2018 06:23 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hi @Shu, when we try to import the hcc-215124.xml file we are getting an error as GenerateFlowFile is not known to this Nifi instance. Could you please try to import once to your nifi intance and then share the file again.
Created ‎09-07-2018 01:13 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hi @Shu, thanks very much the solution is working. But in case of a failure,we are missing the records.
Eg : Run 1:when executing the query between "2018-09-06 12:00:00.00" and "2018-09-06 12:15:00.00" it is processed successfully
Run 2:In next run the query failed for some reason, "2018-09-06 12:15:00.00" and "2018-09-06 12:30:00.00.
run 3:In subsequent run, Db fetch is successful, "2018-09-06 12:30:00.00" and "2018-09-06 12:45:00.00"
In the above case we are missing the records for Run 2 is there a way to maintain transaction of fully commit or rollback, or combine two processor as a single unit of work?
Created ‎09-08-2018 01:27 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
For this case please look into this link for storing and fetching the state from distributed cache map.
By using this approach we are updating the state only when the pull has been succeeded, if the pull failed then we are not storing the state.
Created ‎10-23-2018 05:31 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
@shu: in incremental case: can we fetch updated rows on next schedule time. like i am adding 3 new rows and update provious row. so can we fetch updated rows also. through timestamp
