Created 12-05-2024 04:29 AM
Hi All,
So I have a flow that at the end changes particular status through API.
The direct upstream processor is putEmail, which is supposed to notify dynamically generated contacts with data.
After this e-mail is sent, the requirement is to wait 24 hours before using the API call.
I have had a hard time understanding the Wait processor and its relation to signal release. ControlRate does not really work (or I cannot set it up properly).
What would be a suggested way to deal with such a scenario?
Created 12-05-2024 06:38 AM
@bgumis
One way you can accomplish what you want to do is using an UpdateAttribute processor and RouteOnAttribute processor between your PutEmail and InvokeHTTP processors.
The UpdateAttribute would be used to add an attribute to the FlowFile with the current time in milliseconds using NiFi Expression Language (NEL) statement:
${now():toNumber()}
The success relationship of the update Attribute processor would be routed to the RouteOnAttribute processor. In the RouteOnAttribute processor you setup a new property that compares the records timestamp added to FlowFile by UpdateAttribute against the current time in milliseconds to see if 24 hours has passed yet. If true, the FlowFile will route to the expression property name relationship. If false, the FlowFile will route to the "unmatched" relationship which can be looped back on RouteOnAttribute. The "unmatched" relationship should be configured to use "retry" to trigger penalization of FlowFiles. This helps limit cpu usage by slowing how often the FlowFile is re-processed by penalizing the FlowFile on each retry. The "Retry Max Back Off Period" would translate into the max time beyond 24 hours a FlowFile may get routed to the invokeHTTP. I assume 24 hours is a min wait period and not a hard limit.
The NEL statement used in RouteOnAttribute would look like this:
${now():toNumber():minus(${emailTime}):ge('86400000')}
The other option would require a custom script to set penalization of 24 hours on each FlowFile after the putEmail processor.
Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped.
Thank you,
Matt
Created 12-05-2024 05:04 AM
Hi @bgumis ,
Can you please provide more info on your flow like screenshot or description of the steps and where do you expect the wait to happen? The wait processor is not intended for this scenario. Can you provide more information on why the ControlRate is not working for you wither through the configuration or through where it should be placed in the dataflow?
Thanks
Created 12-05-2024 05:13 AM
Hi Samsal,
Sure, here is the flow itself:
Here is config of controlRate processor
I have also tried data rate, but the moment the fileFlow arrives at that processor, it immediately goes to queue for invokeHTTP.
Created 12-05-2024 06:38 AM
@bgumis
One way you can accomplish what you want to do is using an UpdateAttribute processor and RouteOnAttribute processor between your PutEmail and InvokeHTTP processors.
The UpdateAttribute would be used to add an attribute to the FlowFile with the current time in milliseconds using NiFi Expression Language (NEL) statement:
${now():toNumber()}
The success relationship of the update Attribute processor would be routed to the RouteOnAttribute processor. In the RouteOnAttribute processor you setup a new property that compares the records timestamp added to FlowFile by UpdateAttribute against the current time in milliseconds to see if 24 hours has passed yet. If true, the FlowFile will route to the expression property name relationship. If false, the FlowFile will route to the "unmatched" relationship which can be looped back on RouteOnAttribute. The "unmatched" relationship should be configured to use "retry" to trigger penalization of FlowFiles. This helps limit cpu usage by slowing how often the FlowFile is re-processed by penalizing the FlowFile on each retry. The "Retry Max Back Off Period" would translate into the max time beyond 24 hours a FlowFile may get routed to the invokeHTTP. I assume 24 hours is a min wait period and not a hard limit.
The NEL statement used in RouteOnAttribute would look like this:
${now():toNumber():minus(${emailTime}):ge('86400000')}
The other option would require a custom script to set penalization of 24 hours on each FlowFile after the putEmail processor.
Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped.
Thank you,
Matt
Created on 12-05-2024 05:41 AM - edited 12-05-2024 05:42 AM
Couple of things I noticed:
1- You dont need the Wait processor as I said it doesnt apply in this case. Wait & Notify processor work together please review info about this here. You can remove the Wait processor and replace with the ControlRate processor.
2- In the Control Rate set the Max Rate = 1 . This will allow only one flowfile entry per specified Time Duration. The first flowfile passed after adding this processor or changing its setting will go through without waiting but any subsequent flowfiles will wait. Each passed flowfile will reset the time so the next flowfile will wait the full Time Duration.
3- Why do you have the InvokeHttp disabled?! If its disabled then nothing will run coming from the ControlRate processor.
Created 12-05-2024 05:52 AM
Hi Samsal,
I added Wait processor there for debugging if needed, and invokeHTTP is disabled because it would hamper my testing due to the fact the initial trigger is the status change (So status changes to Started, fileflow is generated, then at the end it changes the status to ready).
As stopped processors are affected by starting/stopping the entire process group, I found it much easier to disable it after I already confirmed it worked the way I wanted.
Yeah, then I am in a bit of a pickle, I would like to utilize either total life duration of a fileFlow or when it went through last processor.
The total duration is fine as the whole cycle lasts few seconds
Created 12-05-2024 06:49 AM
I apologize but this so confusing to me. Im having hard time making heads from tail here. Also it seems you are trying to change status from disabled to ready (enabled) I'm not sure you can do that even with the API , once the processor is disabled it cant be changed until manually enabled. I dont think I can help you if I dont understand your problem statement very well but I hope someone else can. My advise is to re write your problem statement so that its clear to follow begging to end and then articulate what the problem is and how you are trying to solve, if its too long or to complex then try to come up simplified scenario that would isolate the problem you are trying to solve.
Created 12-06-2024 12:25 AM
@SAMSAL
Hi, sorry that I was not clear enough in my question, was doing my best 😞
@MattWho
This works, thank you very much.
I have set a maximum back-off time for 1 minute as this process is quite time-sensitive and set the queue before RouteonAttribute as well as looped relationship to prioritize the oldest fileflow.
Thank you both for help!