Support Questions

Find answers, ask questions, and share your expertise

NiFi - Trigger a Processor once after the Queue gets empty for the previous processor.

avatar
Explorer

glad1_0-1704801946227.png

As seen from the image above. My requirement is for the "ExecuteSQL" processor to run only once after all the queued files are processed via "PutDatabaseRecord" processor.

Right now it will run 190+57 times but I want to run after all the queued files are processed through. Is there any way to do this in Nifi ?

1 ACCEPTED SOLUTION

avatar
Master Mentor

@glad1 

Based on what you shared, you may be able to accomplish what you are trying to do using a couple additional processors and a Child Process Group that utilizes "Process Group FlowFile Concurrency" settings.

So your use case involves:
For each 1 FlowFile output by your ExecuteSQLRecord processor you want to do ALL the following before the next FlowFile produced by ExecuteSQL record is processed:
1. Split the FlowFIle into X number of FlowFiles using SplitJson.
2. Modify each one of the produced Split FlowFiles using UpdateRecord.
3. Write all those modified FlowFiles ot another Database using PutDatabaseRecord
4. ExecuteSQL only once to update Record that all splits were processed.

Then repeat above for next produce FlowFile

If this is correct, here is what you might want to try:
1. create a Process Group that you will insert in teh middle of this flow as shown in the following image:

MattWho_0-1705006915117.png

2. Configure that Process group as follows:

MattWho_1-1705006982170.png

Important properties to set here are:
Process Group FlowFile Concurrency = Single FlowFile Per Node
- Process Group Outbound Policy = Batch Output
What this does is allow only one FlowFile (per node in multi-node NiFi) to enter this PG at any given time. Inside this Process group you will have handle the processing of this FlowFile (split, update, putDB).  The outbound policy will not release any of the Produced Splits from teh PG until all are queued at the output port.

MattWho_2-1705007226621.png

You'll notice I added one additional optional processor ModifyBytes to your dataflow (configured with "Remove all Content = true).  This will zero out the content on the FlowFiles after they were written using the PutDatabaseRecord processor.  Then those FlowFiles with no content now are sent to connection feeding output port where they are held until all splits produced are queued.  They will then all be released at once from the PG to the next new processor MergeContent. The MergeContent processor will merge all those FlowFiles into a single FlowFile that feeds your ExecuteSQL (UpdateRecordStatus toP) processor.  Now you have a single notification for the original FlowFile that was split and processed.  Additionally you have created separation between each source FlowFile processed at start of dataflow.

If you found any of the suggestions/solutions provided helped you with your issue, please take a moment to login and click "Accept as Solution" on one or more of them that helped.

Thank you,
Matt

View solution in original post

5 REPLIES 5

avatar
Master Mentor

@glad1 

The first question is how do you know when all the FlowFiles have been processed through the PutDatabaseRecord processor?  Here I see FlowFiles queued in every connection.  So before you can tackle your question, need to understand more about your complete dataflow. 

1. How does the flow of data begin? 
2. Does it come in as one FlowFile that is split? 
3. Does it come in batches?  How do you know when a complete batch is ingested?
4. Is ingestion based on cron?
5. Does the ExecuteSQL use content of FlowFile Attributes from the incoming FlowFiles when it is executed?

Thanks,
Matt

avatar
Explorer

1&2) It comes from a single flowfile, which is then split using splitjson

glad1_0-1704814339041.png
3&4) no

5) No, it doesnt.

avatar
Master Mentor

@glad1 

Based on what you shared, you may be able to accomplish what you are trying to do using a couple additional processors and a Child Process Group that utilizes "Process Group FlowFile Concurrency" settings.

So your use case involves:
For each 1 FlowFile output by your ExecuteSQLRecord processor you want to do ALL the following before the next FlowFile produced by ExecuteSQL record is processed:
1. Split the FlowFIle into X number of FlowFiles using SplitJson.
2. Modify each one of the produced Split FlowFiles using UpdateRecord.
3. Write all those modified FlowFiles ot another Database using PutDatabaseRecord
4. ExecuteSQL only once to update Record that all splits were processed.

Then repeat above for next produce FlowFile

If this is correct, here is what you might want to try:
1. create a Process Group that you will insert in teh middle of this flow as shown in the following image:

MattWho_0-1705006915117.png

2. Configure that Process group as follows:

MattWho_1-1705006982170.png

Important properties to set here are:
Process Group FlowFile Concurrency = Single FlowFile Per Node
- Process Group Outbound Policy = Batch Output
What this does is allow only one FlowFile (per node in multi-node NiFi) to enter this PG at any given time. Inside this Process group you will have handle the processing of this FlowFile (split, update, putDB).  The outbound policy will not release any of the Produced Splits from teh PG until all are queued at the output port.

MattWho_2-1705007226621.png

You'll notice I added one additional optional processor ModifyBytes to your dataflow (configured with "Remove all Content = true).  This will zero out the content on the FlowFiles after they were written using the PutDatabaseRecord processor.  Then those FlowFiles with no content now are sent to connection feeding output port where they are held until all splits produced are queued.  They will then all be released at once from the PG to the next new processor MergeContent. The MergeContent processor will merge all those FlowFiles into a single FlowFile that feeds your ExecuteSQL (UpdateRecordStatus toP) processor.  Now you have a single notification for the original FlowFile that was split and processed.  Additionally you have created separation between each source FlowFile processed at start of dataflow.

If you found any of the suggestions/solutions provided helped you with your issue, please take a moment to login and click "Accept as Solution" on one or more of them that helped.

Thank you,
Matt

avatar
Explorer

Thanks @MattWho .

Adding MergeContent in the end with defragmentation as the strategy helps. 

I have one question, Is the PG necessary ?,  if all I get is 1 flowfile output from the starting ExecuteSQLRecord processor per day ?

avatar
Master Mentor

@glad1 
 No not necessary.  I suggested becasue i was still unclear how often your initial ExecuteSQL was producing a source file.  The PG makes it easy to throttle per source FLowFile processing so you would get one merged FlowFile for each produced FlowFile.

Thanks,
Matt