Support Questions

Find answers, ask questions, and share your expertise

How to rollback the state of max value column of QueryDatabaseTable if the downstream processor fails

avatar
New Contributor

Hello there..  I am new to Nifi and have started using QueryDatabaseTable for incremental fetch. I want to know how to handle a situation as below. For example

QDT processor fetched new data based on id column - id 50 to id 55

State of QDT processor (max value QDT processor processed/View State) - 55

Next Steps - ConvertAvrotoJson----->ConvertJsontoSQL ---->PutSQL

Now what if the PutSQL processor failed for some reason ? Should not the QDT processor roll back the state of max value from 55 to 49 so that next time QDT processor executes it again starts fetching id from 50  ?  When I checked the state of QDT processor it showed me 55 although the new records were not inserted in final table and when i execute the QDT processor again it fetched data from 56 causing data loss at the destination side.. Is there any way to let the QDT processor know that the downstream processors failed and it should now roll back the state  to previous value before the current run ?

1 ACCEPTED SOLUTION

avatar
Super Guru

Hi @Rashad_K ,

I dont believe such feature exists out of the box. Processors are designed to be isolated from each other and only can communicate via relationships, therefore there is no such thing as rollback which will reset the state of the QDT based on the result of the PutSQL. If I may suggest the following:

1- Depending on why would some records fail using PutSql, you can use the PutSql Failure relationship to handle errors. For example if the failure is due to connectivity issue you can retry it by looping the failure rel back to the PutSql or you can use ControlRate processor before retrying to wait sometime . If the failure is data related or syntax you can store those records in another table for farther processing and re import them back once the data is fixed. The idea from QDT is that once data is inserted into the target table it wont be changed (like logs for example) and any correction need to be sent as new record hence the max value doesnt have to be changed on data error issues.

 

2- Second option will only work if you have control over the target table in the QDT where records processed successfully are removed from the table. In this case if records failed for some reason and you want to re read then you can invoke the API to reset the state , which means the QDT will start from the top and re read failed records. However in this approach you have to be aware of concurrency issues in case the state is reset while some reocrds are being processed and have not been removed yet from the QDT table where you might end up re reading those records again. That is why I dont recommend this approach. 

Of course there could be other options out there but my advise and based on experience is try to pick a solution where isolation is in mind to avoid conflicts and unpredictable behavior.

Hope that helps. If it does, please accept solution.

Thanks

View solution in original post

2 REPLIES 2

avatar
Super Guru

Hi @Rashad_K ,

I dont believe such feature exists out of the box. Processors are designed to be isolated from each other and only can communicate via relationships, therefore there is no such thing as rollback which will reset the state of the QDT based on the result of the PutSQL. If I may suggest the following:

1- Depending on why would some records fail using PutSql, you can use the PutSql Failure relationship to handle errors. For example if the failure is due to connectivity issue you can retry it by looping the failure rel back to the PutSql or you can use ControlRate processor before retrying to wait sometime . If the failure is data related or syntax you can store those records in another table for farther processing and re import them back once the data is fixed. The idea from QDT is that once data is inserted into the target table it wont be changed (like logs for example) and any correction need to be sent as new record hence the max value doesnt have to be changed on data error issues.

 

2- Second option will only work if you have control over the target table in the QDT where records processed successfully are removed from the table. In this case if records failed for some reason and you want to re read then you can invoke the API to reset the state , which means the QDT will start from the top and re read failed records. However in this approach you have to be aware of concurrency issues in case the state is reset while some reocrds are being processed and have not been removed yet from the QDT table where you might end up re reading those records again. That is why I dont recommend this approach. 

Of course there could be other options out there but my advise and based on experience is try to pick a solution where isolation is in mind to avoid conflicts and unpredictable behavior.

Hope that helps. If it does, please accept solution.

Thanks

avatar
New Contributor

Thank you @SAMSAL .. appreciated the way of explanation.. generally i'l prefer to add/update the max.initialvalue.column property of QDT  to restart fetching missed data as a one time modification but I would love to see NIFI using fault tolerant mechanism in future so that the developers don't have to find the workaround or have to recover the lost data manually by modifying the QDT