Created 08-30-2018 03:07 PM
Fetch records from a database incrementally based on time interval We have this requirement to pull records from a database which has millions of records, the only column we can use is a date column. Is there an option to keep incremental based on a time value. Eg: If the initial state is 2018-08-30 12:00:00.00, in the first run it has to fetch the records between 2018-08-30 12:00:00.00 and 2018-08-30 12:15:00.00. Basically it has to keep adding 15 mins after each run. In the next run it has to fetch records between 2018-08-30 12:15:00.00 and 2018-08-30 12:30:00.00.Is this possible to acheive through QueryDatabaseTable processor.
Created on 08-30-2018 11:13 PM - edited 08-18-2019 03:06 AM
By using ExecuteSql processor you can achieve this use case.
flow:
Update attribute configs:
last_iter_val
${getStateValue("next_iter_val")} //to get the last iteration value
next_iter_val
${getStateValue("next_iter_val"):toDate("yyyy-MM-dd HH:mm:ss"):toNumber() :plus(900000):format("yyyy-MM-dd HH:mm:ss")} //get the last iteration value and add 15 mins to it
ExecuteSql configs:
By using the attribute values we are going to fetch the records incrementally from the table.
i have attached the xml you can upload that xml and change as per your requirements.
In addition by using same kind of logic you can store your state in DistributeCache/HBase/Hive/HDFS and fetch the state and increment with 15 mins then pull the data from the table.
Refer to this for more details regards to another ways of storing the state in NiFi
-
If the Answer helped to resolve your issue, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of issues.
Created on 08-30-2018 11:13 PM - edited 08-18-2019 03:06 AM
By using ExecuteSql processor you can achieve this use case.
flow:
Update attribute configs:
last_iter_val
${getStateValue("next_iter_val")} //to get the last iteration value
next_iter_val
${getStateValue("next_iter_val"):toDate("yyyy-MM-dd HH:mm:ss"):toNumber() :plus(900000):format("yyyy-MM-dd HH:mm:ss")} //get the last iteration value and add 15 mins to it
ExecuteSql configs:
By using the attribute values we are going to fetch the records incrementally from the table.
i have attached the xml you can upload that xml and change as per your requirements.
In addition by using same kind of logic you can store your state in DistributeCache/HBase/Hive/HDFS and fetch the state and increment with 15 mins then pull the data from the table.
Refer to this for more details regards to another ways of storing the state in NiFi
-
If the Answer helped to resolve your issue, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of issues.
Created 10-23-2018 04:41 AM
@Shu: in incremental case: can we fetch updated rows on next schedule time. like i am adding 3 new rows and update provious row. so can we fetch updated rows also. through timestamp
Created 09-07-2018 06:23 AM
Hi @Shu, when we try to import the hcc-215124.xml file we are getting an error as GenerateFlowFile is not known to this Nifi instance. Could you please try to import once to your nifi intance and then share the file again.
Created 09-07-2018 01:13 PM
Hi @Shu, thanks very much the solution is working. But in case of a failure,we are missing the records.
Eg : Run 1:when executing the query between "2018-09-06 12:00:00.00" and "2018-09-06 12:15:00.00" it is processed successfully
Run 2:In next run the query failed for some reason, "2018-09-06 12:15:00.00" and "2018-09-06 12:30:00.00.
run 3:In subsequent run, Db fetch is successful, "2018-09-06 12:30:00.00" and "2018-09-06 12:45:00.00"
In the above case we are missing the records for Run 2 is there a way to maintain transaction of fully commit or rollback, or combine two processor as a single unit of work?
Created 09-08-2018 01:27 PM
For this case please look into this link for storing and fetching the state from distributed cache map.
By using this approach we are updating the state only when the pull has been succeeded, if the pull failed then we are not storing the state.
Created 10-23-2018 05:31 AM
@shu: in incremental case: can we fetch updated rows on next schedule time. like i am adding 3 new rows and update provious row. so can we fetch updated rows also. through timestamp