Support Questions

Find answers, ask questions, and share your expertise

Fetch records from a database incrementally based on time interval

avatar

Fetch records from a database incrementally based on time interval We have this requirement to pull records from a database which has millions of records, the only column we can use is a date column. Is there an option to keep incremental based on a time value. Eg: If the initial state is 2018-08-30 12:00:00.00, in the first run it has to fetch the records between 2018-08-30 12:00:00.00 and 2018-08-30 12:15:00.00. Basically it has to keep adding 15 mins after each run. In the next run it has to fetch records between 2018-08-30 12:15:00.00 and 2018-08-30 12:30:00.00.Is this possible to acheive through QueryDatabaseTable processor.

1 ACCEPTED SOLUTION

avatar
Master Guru
@Saravanan Subramanian

By using ExecuteSql processor you can achieve this use case.

flow:

87490-flow.png

Update attribute configs:

87491-update-attribuite.png

last_iter_val

${getStateValue("next_iter_val")} //to get the last iteration value

next_iter_val

${getStateValue("next_iter_val"):toDate("yyyy-MM-dd HH:mm:ss"):toNumber() :plus(900000):format("yyyy-MM-dd HH:mm:ss")} //get the last iteration value and add 15 mins to it

ExecuteSql configs:

87492-es.png

By using the attribute values we are going to fetch the records incrementally from the table.

i have attached the xml you can upload that xml and change as per your requirements.

hcc-215124.xml

In addition by using same kind of logic you can store your state in DistributeCache/HBase/Hive/HDFS and fetch the state and increment with 15 mins then pull the data from the table.

Refer to this for more details regards to another ways of storing the state in NiFi

-

If the Answer helped to resolve your issue, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of issues.

View solution in original post

6 REPLIES 6

avatar
Master Guru
@Saravanan Subramanian

By using ExecuteSql processor you can achieve this use case.

flow:

87490-flow.png

Update attribute configs:

87491-update-attribuite.png

last_iter_val

${getStateValue("next_iter_val")} //to get the last iteration value

next_iter_val

${getStateValue("next_iter_val"):toDate("yyyy-MM-dd HH:mm:ss"):toNumber() :plus(900000):format("yyyy-MM-dd HH:mm:ss")} //get the last iteration value and add 15 mins to it

ExecuteSql configs:

87492-es.png

By using the attribute values we are going to fetch the records incrementally from the table.

i have attached the xml you can upload that xml and change as per your requirements.

hcc-215124.xml

In addition by using same kind of logic you can store your state in DistributeCache/HBase/Hive/HDFS and fetch the state and increment with 15 mins then pull the data from the table.

Refer to this for more details regards to another ways of storing the state in NiFi

-

If the Answer helped to resolve your issue, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of issues.

avatar
New Contributor

@Shu: in incremental case: can we fetch updated rows on next schedule time. like i am adding 3 new rows and update provious row. so can we fetch updated rows also. through timestamp

avatar

Hi @Shu, when we try to import the hcc-215124.xml file we are getting an error as GenerateFlowFile is not known to this Nifi instance. Could you please try to import once to your nifi intance and then share the file again.

avatar

Hi @Shu, thanks very much the solution is working. But in case of a failure,we are missing the records.

Eg : Run 1:when executing the query between "2018-09-06 12:00:00.00" and "2018-09-06 12:15:00.00" it is processed successfully

Run 2:In next run the query failed for some reason, "2018-09-06 12:15:00.00" and "2018-09-06 12:30:00.00.

run 3:In subsequent run, Db fetch is successful, "2018-09-06 12:30:00.00" and "2018-09-06 12:45:00.00"

In the above case we are missing the records for Run 2 is there a way to maintain transaction of fully commit or rollback, or combine two processor as a single unit of work?

avatar
Master Guru

@Saravanan Subramanian

For this case please look into this link for storing and fetching the state from distributed cache map.

By using this approach we are updating the state only when the pull has been succeeded, if the pull failed then we are not storing the state.

avatar
New Contributor

@shu: in incremental case: can we fetch updated rows on next schedule time. like i am adding 3 new rows and update provious row. so can we fetch updated rows also. through timestamp