Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

How to detect all branches in a NiFi flow have finished processing?

avatar
Explorer

Hi! I'm using NiFi 2.0.0 M1, and I'm writing a flow inside a processed group. I had a problem when I wrote the flow.

The flow should be run as an individual task. Every time when it is run, it will take in some data from a table on MySQL Server, use many LookupRecord Processors and then process the matched and unmatched data respectively. Finally, it turns out to have 9 branches within the flow.

After processing all these, I need to trigger an ExecuteSQLRecord processor which will call some stored procedures in MySQL. Literally, only when all the above branches finish running should this processor be triggered (and it should be triggered only once in one task). To achieve this, I wanted to use Notify and Wait processor to detect whether all the branches finish running and when I should trigger the ExecuteSQLRecord processor. However, I can only set a fixed number of signals to wait for in the Wait processor, while in the real case, sometimes not all of the 9 branches will be run. (Because, sometimes, there won't be matched or unmatched data for a lookup processor, and hence the corresponding branch won't be triggered to run.)

Did anyone face the same problem as mine? Is there any good way to achieve my goal?

(Below is part of the flow. You can see that I tried to use Wait but was kind of stuck there 😭)

iriszhuhao_0-1707902687817.png

 

2 ACCEPTED SOLUTIONS

avatar
Master Mentor

@iriszhuhao 

This might be a good use case for using the FlowFile Concurrency  and Outbound Policy configuration option on a process group.  FlowFile concurrency allows you to place a portion of your dataflow into a process group and be able to control how the initial FlowFile or batch of FlowFiles is allowed to enter that process group for processing.  The Outbound Policy controls when the FlowFiles being processed in that process group will be released to processor(s) downstream of that process group.  Downstream components of the process group will not receive FlowFiles from the process group until all FlowFiles within the process group have either been auto-terminated or queued up to one or more output ports.  When the outbound policy is met, the FlowFile(s) are released downstream and the Process group's FlowFile concurrency then allows for next batch processing.  So it might makes sense to place the portion of your dataflow comprised of your nine concurrent branches in this bounded process group and downstream you have your your ExecuteSQLRecord processor call your final procedure now that you know all branches have completed.

Above solves your problem with not all nine branches always being used. 

If you found any of the suggestions/solutions provided helped you with your issue, please take a moment to login and click "Accept as Solution" on one or more of them that helped.

Thank you,
Matt

View solution in original post

avatar
Master Mentor

@iriszhuhao 

I would use ModifyBytes processor within the Process group just before the output port to remove the content of the FlowFiles.  The outside the Process Group add a MergeContent that will merge all those 0 bytes FlowFiles just released in a batch from the Process Group before you execute your stored procedures.

If you found any of the suggestions/solutions provided helped you with your issue, please take a moment to login and click "Accept as Solution" on one or more of them that helped.

Thank you,
Matt



View solution in original post

7 REPLIES 7

avatar
Community Manager

@iriszhuhao, Welcome to our community! To help you get the best possible answer, I have tagged our NiFi experts @MattWho @cotopaul @SAMSAL  who may be able to assist you further.

Please feel free to provide any additional information or details about your query, and we hope that you will find a satisfactory solution to your question.



Regards,

Vidya Sargur,
Community Manager


Was your question answered? Make sure to mark the answer as the accepted solution.
If you find a reply useful, say thanks by clicking on the thumbs up button.
Learn more about the Cloudera Community:

avatar
Super Guru

Hi @iriszhuhao ,

The wait processor is supposed to be used with the Notify processor to work correctly and its more suited for when  you have a data that will be split downstream  where each split flowfile will have a common identifier and split count that the Wait-Notify processors can go by  as explained in this article:

https://pierrevillard.com/2018/06/27/nifi-workflow-monitoring-wait-notify-pattern-with-split-and-mer...

When you get the data from SQL do you get it all in one flowfile or its already split where each record in one flowfile. If you are not getting a lot of data then I would recommend to get it all in one flowfile and then split it so that you can use this pattern and to run the final SP once. So the design will look something like this:

 

SAMSAL_3-1707916946248.png

 

In this case the Wait-Notify Signal identifier will be the value of the fragment.identifier and the Wait Target Signal Count will be the value of the fragment.count which both written by the Split processor.

If you find this helpful please accept solution.

Thanks

 

avatar

@iriszhuhao, to be really honest, your problem is not 100% related to NiFi, as this is more of an architectural question.

How would you achieve what you are trying to achieve in another ETL Graphical Tool (Informatica, Talend, Knime, etc)? That logic should be applied in your case as well, as most of the tools perform the same thing but in a different manner. 

Now, going back to your question, bear in mind that this might not be the best solution, but you could implemented a signaling table where you store the execution of each of your 9 flows. Basically you create a database table in which you will store something like PROCESSOR_ID, PROCeSSOR_NAME, EXECUTION_TIME, SUCCES. From each of your success flows, you will go into a processor where you generate this data and afterwards insert the content into your table. Next, out of your final processor (the one saving the data in your database) you could into an RetryFlowFile where you set a penalty of X seconds/hours/minutes. From that flow file, you then go into an ExecuteSQLRecord and interogate your table. The output of the ExecuteSQLRecord is then sent into a query processor and check if all 9 flows have entered a row in your table. If all 9 inserted a row in your table, you proceed to do what you want. If not all 9 flows inserted a row in your table, you go into an processor where you call the NiFi's Rest API to check the status of your affected processors (you can do that with an ExecuteStreamCommand, ExecuteScript, InvokeHTTP, etc). You can call the /nifi-api/flow/processors/YOUR_MISSING_PROCESSOR/status API and check the results (statsLastRefreshed, flowFilesIn, flowFilesOut) to see whether something came or should have came through that flow. Next, based on your logic, you can proceed next to checking other processors or call the stored procedure to end your flow. Lastly, you can truncate your table once your execution is done, if you do not want to keep some historical data there. 

Again, this might not be the best solution, but this is the first thing I would try if I were in your situation. 

avatar
Master Mentor

@iriszhuhao 

This might be a good use case for using the FlowFile Concurrency  and Outbound Policy configuration option on a process group.  FlowFile concurrency allows you to place a portion of your dataflow into a process group and be able to control how the initial FlowFile or batch of FlowFiles is allowed to enter that process group for processing.  The Outbound Policy controls when the FlowFiles being processed in that process group will be released to processor(s) downstream of that process group.  Downstream components of the process group will not receive FlowFiles from the process group until all FlowFiles within the process group have either been auto-terminated or queued up to one or more output ports.  When the outbound policy is met, the FlowFile(s) are released downstream and the Process group's FlowFile concurrency then allows for next batch processing.  So it might makes sense to place the portion of your dataflow comprised of your nine concurrent branches in this bounded process group and downstream you have your your ExecuteSQLRecord processor call your final procedure now that you know all branches have completed.

Above solves your problem with not all nine branches always being used. 

If you found any of the suggestions/solutions provided helped you with your issue, please take a moment to login and click "Accept as Solution" on one or more of them that helped.

Thank you,
Matt

avatar
Explorer

Hi @MattWho ,

Thank you for the reply. I believe the FlowFile Concurrency and Outbound Policy can solve the detection problem. However, further, is there a way to run the downstream processors only once per batch of FlowFiles, as multiple FlowFiles will be released from the output port as a batch after I set Outbound Policy to Batch Output? (The downstream processor does not need to process any FlowFile but only calls some stored procedures in the database, so it should only run once per input to the overall flow.)

avatar
Master Mentor

@iriszhuhao 

I would use ModifyBytes processor within the Process group just before the output port to remove the content of the FlowFiles.  The outside the Process Group add a MergeContent that will merge all those 0 bytes FlowFiles just released in a batch from the Process Group before you execute your stored procedures.

If you found any of the suggestions/solutions provided helped you with your issue, please take a moment to login and click "Accept as Solution" on one or more of them that helped.

Thank you,
Matt



avatar
Super Collaborator

I had a need for multiple Lookups...custom Groovy processor with several LookUp services as a part of it...consolidated that, routed accordingly, and performed faster.