Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

how to add delay for 10 seconds while retrying a flow file in NIFI

avatar
Contributor

Hi ,

iam retrying a flow file 3 times before moving moving it to failure relationship  , with each retry i want to have 10 seconds gap in between them , so 2nd retry will happen after 10 seconds .

iam not able to understand how to use wait processor for this . What should be the configurations of that processor .

iam using older version of nifi so i can not use retryFlowFile processor .

 

retry.png

2 ACCEPTED SOLUTIONS

avatar
Master Mentor

@Rohit1997jio   Your images shared above are from CFM 2.1.5.  HDF 3.3.0 was released way back in 2017.  I strongly encourage you to upgrade away form such an old version for security and bug fixes reasons. 

That being said, you could use the ExecuteScript processor to manually penalize a FlowFile.  A penalized FlowFile will be ignored by the downstream processor until the penalty duration has lapsed. 

  • on the "setting" tab set the desired 10 sec "Penalty Duration" you want applied to every FlowFile the ExecuteScript processor executes against.
  • In the "properties" tab select "Groovy" as the "Script Engine" property value.
  • In the "properties" tab add the following script to the "Script Body" property:
FlowFile flowFile = session.get()
if (flowFile == null) {
  return;
}

session.transfer(session.penalize(flowFile), REL_SUCCESS)

Use the ExecuteScript processor in place of your "ControlRate" processor in your dataflow to apply  the 10 second penalty to all FlowFiles being looped for retry.

If you found any of the suggestions/solutions provided helped you with your issue, please take a moment to login and click "Accept as Solution" on one or more of them that helped.

Thank you,
Matt

View solution in original post

avatar
Super Guru

@Rohit1997jio,

I was going to answer this similar to @MattWho   who beat me to it but I will post my answer anyway in case it can help.
Basically @MattWho  is correct , you can use the ExecuteScript processor to somehow simulate the retry flowfile processor and its better than RouteOnAttribute Option because when you use penalize the processor is setting idle during this time unlike the RouteOnAttribute where its always looping to unmached relationship for the period of wait time. Anyway here is my solution and in my case you dont need RouteOnAttribute but you have to add more code.

SAMSAL_6-1703261364929.png

I'm using groovy code for my script. The process relies on two attributes:

1- totalRetry: which incremental value to track the retry threshold every time the file is sent to retry. first time it will be set to 0.

2- isPenalized: is used to track if the file should be penalized before each retry (isPenalized == null) or its already penalized which means its ready for the next retry.

SAMSAL_7-1703266423329.png

The groovy script:

 

flowFile = session.get()
if(!flowFile) return

// get totalPenalized and isPenalized attributes
totalRetry = flowFile.getAttribute('totalRetry')
isPenalized = flowFile.getAttribute('isPenalized')

// if its the first time set the value to 0 (no rety yet. first penalize) 
totalRetry = !totalRetry ? 0 : totalRetry.toInteger()

// if the total retry has passed the threshold ( 3 in this case) then send to failure rel (expired).
// Total wait time (penalize time) 3*10 sec = 30 secs

if(totalRetry.toInteger()>3)
{
	session.transfer(flowFile, REL_FAILURE)
	return
}

// if totalRetry has not passed the  threshold and the file is not 
// penalized (isPenalized == null) then penalize and send back to upstream queue
if(!isPenalized)
{
	flowFile = session.putAttribute(flowFile, 'isPenalized', '1')
	flowFile = session.penalize(flowFile)
	session.transfer(flowFile)
	return
	
}
// Otherwise file has been already penalized then send to retry and increment totalRetry
flowFile = session.putAttribute(flowFile, 'totalRetry', (totalRetry+1).toString())
flowFile = session.removeAttribute(flowFile, 'isPenalized')
session.transfer(flowFile, REL_SUCCESS)
return

 

You can set the Penalize period under processor SETTINGS tab

SAMSAL_8-1703266798552.png

 

Hope this helps

Thanks

 

 

 

View solution in original post

7 REPLIES 7

avatar
Super Guru

Hi @Rohit1997jio ,

What version are you using ? Have you looked into ControlRate processor ( https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.23.2/org.apach... )?

The wait processor is not supposed to be used by itself , it should be accompanied by the Notify processor and its used for certain scenarios where for example if you want to split data into multiple flowfiles and you want to wait for all of them to be processed before performing certain step and so on.

If that helps please accept solution.

Thanks

 

avatar
Contributor

control rate processor is nor working 

reply1.pngreply2.png

avatar
Super Guru

I noticed you are using the "data rate" for the Rate Control Criteria. I dont think this is the correct criteria , instead it should be by "flowfile count". Also you mentioned that you wanted to wait 10 seconds so the Time Duration property should be set as such.

 

SAMSAL_1-1703080960714.png

Keep in mind setting the ControlRate this way means , that the processor will allow 1 flowfile to proceed to the success relationship every 10 secs, so if you have 3 files added to the queue around the same time , the first one will proceed after 10 seconds , 2ed will proceed after 20 secs and the 3rd in 30 secs.  If you are looking for only 10 secs on each file , you can use RouteOnAttribute as mentioned here: https://stackoverflow.com/questions/61875809/introduce-time-delay-before-moving-flow-files-to-next-p...

 

I noticed also from your processors that you are using version 1.18 which should have retryflowfile .

 

If that helps please accept solution.

Thanks

avatar
Master Mentor

@Rohit1997jio 

Need to understand your use case more....

Agreed with @SAMSAL that the Wait and Notify processors are not what you want to use here.

ControlRate processor may also not be your best option here.  assume you have multiple FlowFiles all route to failure.  All of those will reach that controlRate processor at about same time.  First goes right through if it has been 10 seconds since last Flowfile passed through it (so first is not delayed 10 seconds at all).  All FlowFiles queued behind allowing 1 to pass through every 10 seconds.  So with multiple FlowFiles you now have an even larger gap between processing.

I see that you are using CFM 2.1.5 based off Apache NiFi 1.1.8.  In this version of NiFi the ability to handle retry on a relationship is built in to the processors framework.  This new feature is more powerful than the multiple processor method you are using now in the dataflow you shared above.  Your flow requires more processors and more resource usage. There is also no way to guarantee a exact 10 second delay.  So hope this is a soft requirement.

Open the configuration for the PublishKafka processor and select the "Relationships" tab.
On the "failure" relationship you have the ability to check the box for "retry".  when selected, additional configuration parameters appear.  Here you can specify the number of retry attempts to make before the FlowFile is routed to the "failure" relationship.  You also have the ability to control how this processor behaves when retry is happening:
- Penalize (set/apply the penalty defined on the setting tab of processor to this FlowFile.  This FlowFile will not be retried until penaltly expires.  During this penalty duration processor will continue to process other FlowFiles.)
- Yield (Same as penalize EXCEPT, no other FlowFiles will get processed until this FlowFile is sucessfully processed or routed to failure relationship.  Useful when processing order of FlowFiles i important).

What this built in retry will not do is apply same penalty to every retry.
Let's say you set "penalty duration" to 10 secs in setting tab.  Then you configure "number of retry attempts" to 3.  First failure results in FlowFile being penalized 10 secs, second attempt is penalized 20 secs, and third attempt is penalized 40 secs.   This incremental back-off help reduce resource usage when you have continuous failure (something else you flow will not do). 

Once all attempts to retry are exhausted without success, FlowFile would rout to failure where you could use LogMessage and send failed FlowFile to different processing path like you did in your dataflow. 

If you found any of the suggestions/solutions provided helped you with your issue, please take a moment to login and click "Accept as Solution" on one or more of them that helped.

Thank you,
Matt

avatar
Contributor

nifi version iam using is old Powered by Apache NiFi - Version 1.8.0.3.3.0.0-165 , i have developed the same solution using retryFlowFile processor , but its working in my other environment due to old nifi version , so i need to have same implementation in that too .

@MattWho even retry relation is not there in this version.

i was trying to achieve this by expression language , but that is also not working.

 

 

avatar
Master Mentor

@Rohit1997jio   Your images shared above are from CFM 2.1.5.  HDF 3.3.0 was released way back in 2017.  I strongly encourage you to upgrade away form such an old version for security and bug fixes reasons. 

That being said, you could use the ExecuteScript processor to manually penalize a FlowFile.  A penalized FlowFile will be ignored by the downstream processor until the penalty duration has lapsed. 

  • on the "setting" tab set the desired 10 sec "Penalty Duration" you want applied to every FlowFile the ExecuteScript processor executes against.
  • In the "properties" tab select "Groovy" as the "Script Engine" property value.
  • In the "properties" tab add the following script to the "Script Body" property:
FlowFile flowFile = session.get()
if (flowFile == null) {
  return;
}

session.transfer(session.penalize(flowFile), REL_SUCCESS)

Use the ExecuteScript processor in place of your "ControlRate" processor in your dataflow to apply  the 10 second penalty to all FlowFiles being looped for retry.

If you found any of the suggestions/solutions provided helped you with your issue, please take a moment to login and click "Accept as Solution" on one or more of them that helped.

Thank you,
Matt

avatar
Super Guru

@Rohit1997jio,

I was going to answer this similar to @MattWho   who beat me to it but I will post my answer anyway in case it can help.
Basically @MattWho  is correct , you can use the ExecuteScript processor to somehow simulate the retry flowfile processor and its better than RouteOnAttribute Option because when you use penalize the processor is setting idle during this time unlike the RouteOnAttribute where its always looping to unmached relationship for the period of wait time. Anyway here is my solution and in my case you dont need RouteOnAttribute but you have to add more code.

SAMSAL_6-1703261364929.png

I'm using groovy code for my script. The process relies on two attributes:

1- totalRetry: which incremental value to track the retry threshold every time the file is sent to retry. first time it will be set to 0.

2- isPenalized: is used to track if the file should be penalized before each retry (isPenalized == null) or its already penalized which means its ready for the next retry.

SAMSAL_7-1703266423329.png

The groovy script:

 

flowFile = session.get()
if(!flowFile) return

// get totalPenalized and isPenalized attributes
totalRetry = flowFile.getAttribute('totalRetry')
isPenalized = flowFile.getAttribute('isPenalized')

// if its the first time set the value to 0 (no rety yet. first penalize) 
totalRetry = !totalRetry ? 0 : totalRetry.toInteger()

// if the total retry has passed the threshold ( 3 in this case) then send to failure rel (expired).
// Total wait time (penalize time) 3*10 sec = 30 secs

if(totalRetry.toInteger()>3)
{
	session.transfer(flowFile, REL_FAILURE)
	return
}

// if totalRetry has not passed the  threshold and the file is not 
// penalized (isPenalized == null) then penalize and send back to upstream queue
if(!isPenalized)
{
	flowFile = session.putAttribute(flowFile, 'isPenalized', '1')
	flowFile = session.penalize(flowFile)
	session.transfer(flowFile)
	return
	
}
// Otherwise file has been already penalized then send to retry and increment totalRetry
flowFile = session.putAttribute(flowFile, 'totalRetry', (totalRetry+1).toString())
flowFile = session.removeAttribute(flowFile, 'isPenalized')
session.transfer(flowFile, REL_SUCCESS)
return

 

You can set the Penalize period under processor SETTINGS tab

SAMSAL_8-1703266798552.png

 

Hope this helps

Thanks