Support Questions

Find answers, ask questions, and share your expertise

Apache Nifi Control rate processor Issue

avatar
Explorer
Hi Everyone,
 
Can Anyone help us in fixing the issue:

I have a flow where I am using control rate processor to control the data flow, I will submit tasks in druid using those files, for example let's say I have a total of 10 files. I will first ask my control rate processor to send 2 flow files to downstream, now I will submit those files as tasks in druid and fetch their status, if 1 task is running and another one is success I need to ask my control rate processor to send only 1 file, if 2 tasks are running dont submit tasks until they are completed, if 2 are success then send 2 files from control rate an counting the tasks which are running and adding them to an attribute,so is there a way that control rate processor takes the value of count attribute in Maximum rate property. I've checked the Rate Control Criteria property which can be set to Attribute Value but I am confused how to set the rate to attribute value.

2 REPLIES 2

avatar
Super Guru

Hi @sarithe ,

I dont think you can achieve what you want using ControlRate processor since this processor is going to allow flowfile to go downstream when MaximumRate is reached besides this processor doesnt support statefulness to control  Max Rate Property over multiple files. Instead I would suggest taking advantage of the UpdateAttribute processor since it supports statefulness over a given attribute. You can use a stateful attribute as a counter that increments\decrements when the flowfiles are being passed through and process accordingly. Then you can check the counter using RouteOnAttribute to see if its less than your threshold (2 in your case) to proceed with processing the flowfiles and if the counter  has reached the threshold, the flowfiles will loop indefinitely until flowfiles being processed are done after which the counter is  decremented. I was able to do that with simple flow to illustrate , but you can use the same pattern to work with your case. The flow I created will do the following:

1- After getting the input, use UpdateAttribute to set the isProcessed flag to 0

2- Use another UpdateAttribute where Store State is set to "Store State Locally". This processor will be used to define rules that will increment\decrement flowfileCounter based on the isProcessed flag as follows:

    - isProcessed = 0 and flowfileCounter <  2 then flowfileCounter +=1

    - isProcessed = 1 then flowfileCounter-=1

 make sure to set the "Stateful Variables Initial Value"  to 0 to initialize your counter. since it starts from 0 then 2ed flowfile will be increment the counter to 1 hence <2

3- Using RouteOnAttribute create two conditional relationship:

    - ProcessedRel : check if the isProcessed flag is set to 1 . This basically will route to end of the flow. You can terminate if no farther processing is needed.

  - NotProcessedAnd<2 : this will check if the flowfileCounter is less than 2 and flowfile isProcessed is set to 0 to which if true then flowfile will proceed to be processed.

If none of the above condition is met, then you can use unmatched rel to loop back to the UpdateAttribute in step 2.

4- Once flowfile are processed use another UpdateAttribute to set the isProcessed flag to 1 and then point the success rel back to UpdateAttribute in step 2 to decrement the counter based on the defined rules.

Here how my flow looks like:

SAMSAL_0-1695923986320.png

 

GenerateFlowFile: is to simulate creating the input and set the isProcessed flag to 0 on each flowfile (step 1 above)

SAMSAL_1-1695924080971.png

Update flowfileCounter stateful (UpdateAttribute - step2):  used to define the rules for setting the stateful flowfileCounter:

SAMSAL_2-1695924682620.png

click Advanced to define Rules & Actions:

Rule1 \ Action 1: Increment flowfileCounter if flowfile not processed and less than threshold

SAMSAL_3-1695924784127.png

Rule2 \ Action 2: decrement flowfileCounter if flowfile is processed

SAMSAL_4-1695924894875.png

 

RouteOnAttribute : Proceed if flowfile is not processed and threshold < 2 . Finish if is Processed set to 1, otherwise loop back to step 2

SAMSAL_5-1695925061905.png

Do Something (ControlRate): this is just to simulate doing something with the flowfiles from the NotProcessedAnd<2 rel above. This where you process the next two flowfiles.

 

Set isProcessed to 1 (UpdateAttribute -step 4):  set the isProcessed flag to 1 and point the success rel back to UpdateAttribute step 2 to decrement the flowfile counter user Rule2.

SAMSAL_6-1695925374194.png

 

If someone think there is a better way please feel free to provide your input.

If this helps please accept solution.

Thanks

 

 

avatar
Master Mentor

@sarithe 
You may also want to take a look at Process Group (PG) FlowFile Concurrency configuration options as a possible design path since there does not  appear to be any dependency between task 1 and task 2 in your description. 

MattWho_1-1695930540990.png

You just want to make sure that not more than 2 tasks are executing concurrently. You move your processors that handle the 2 task executions inside two different child PGs configured with "Single FlowFile per Node" Process Group FlowFile Concurrency.  Within the PG you create an input port and output port.  Between these two ports you handle your task dataflow.  Outside this PG (parent PG level), you handle the triggering FlowFiles.   The task PGs will allow 1 FlowFile at a time to enter that PG and because of the FlowFile Concurrency setting, not allow any more FlowFiles to enter this PG until that FlowFile processes out.  

 

MattWho_3-1695930807143.png

As you can see from above example, each task PG is only processing a single FlowFile at a time.  I built this example so that task 2 always takes longer, so you see that task 1 Pg is outputting more FlowFile processed the Task 2 PG while still making sure that on two tasks are ever being executed concurrently.

If you found any of the suggestions/solutions provided helped you with your issue, please take a moment to login and click "Accept as Solution" on one or more of them that helped.

Thank you,
Matt