Created 11-28-2023 04:33 AM
Hi ,
I have a publish kafka processor where iam having a self loop and flowfile expiration time for that loop is 1 hours , if some file is there for more then 1 hour it will get removed.
i want to print log message (e.g. dropping the flow file) after every flow file is removed from queue after getting expired.
This is the flow -
Created on 11-28-2023 09:17 AM - edited 11-28-2023 09:20 AM
Hi @Rohit1997jio ,
Not sure if this can be done using the FlowFile Expiration. However, if you are using Nifi 1.16 or higher you can take advantage of another methodology using the "retry" option on the target processor failure relationship as follows:
The concept here is to use the settings for "Number of Retry Attempts" , " Retry Back Off Policy" & "Retry Max Back off Period" to configure how often and for how long the file is retried before it gets pushed to the failure relationship queue where you can then log the needed message. Every failed retry, the flowfile will be pushed back to the upstream queue and wait the designated time before its tried again. The challenge here is how to set those values so that the flowfile is only kept for certain period of time ( 1 hour in your case) specially the file will wait in the queue before its tried again depending wither you set the policy to Penalize or Yield, which is a good thing because you want to have some delay before the flowfile is tried again to avoid a lot of overhead. For example if you want the file to expire in an hour , and you want to try it 60 times where each time you wait 1 min before the next retry then you can set the values as follows:
Number Of Retry Attempts: 60
Retry Back Off Policy : Penalize ( Set the Penalty Duration under Settings Tab to 1 min)
Retry Maximum Back Off Period: 1 min ( this to ensure that the wait time in the queue doesnt exceed the initial penalty duration because every subsequent retry the duration penalty time is doubled - not sure why- )
In this case the flowfile will be retried 60 times upon failure , where each time the flowfile is pushed back to upstream queue an wait only max 1 min before the next retry , which makes the total time flowfile is retried = 60 * 1 min = 60 mins = 1 hour
Depending how often you want to retry and how long you want to wait before the next retry, you can adjust those numbers accordingly.
Once done with all the retries the flowfile will be moved to failure relationship where you can log the final message.
If that helps please accept solution.
Thanks
Created 11-28-2023 07:28 PM
Thanks @SAMSAL its working , but iam not bale to understand the settings .
My requirement is , my max queue size will be 250 requests and i wanted them to get retried till 24 hours , every hour (every 1 hours till 24 hours) . once a flow file has completed 24 hours in queue i want it to get expired and i will drop it , by connecting it to a log message processor .
can you please help on these settings ????
Created 11-29-2023 06:52 AM
@Rohit1997jio 
You could use the RetryFlowFile processor for this use case. 
You will feed the "failure" relationship via a connection to the RetryFlowFile processor.  The RetryFlowfile processor will continue to route the FlowFile back to PublishKafka using the "retry" relationship until maximum number of retries configured has been exceeded.  After max retries has been reached the FlowFile would instead route to the "retries_exceeded" relationship which you can connect to a LogMessage processor.  The logMessage processor would then auto-terminate the "success" relationship.
The challenge you have here is  your requirement to retry once per hour for 24 hours.  You could set the penalty duration in the PublishKafka to 1 hour.  This means that FlowFile routes to the "failure" relationship would get penalized for 60 mins.  The RetryFlowFile would not consume that FlowFile from input connection until penalty duration ended.  Then configure your number of retries in the RetryFlowFile processor to 24.  Be careful with setting queue size to 250 on the failure connection.  If you reach 250 queued on the failure relationship, it will trigger backpressure on the PublishKafka processor meaning the publishKafka processor would not get scheduled again until that backpressure is gone.
If you found any of the suggestions/solutions provided helped you with your issue, please take a moment to login and click "Accept as Solution" on one or more of them that helped.
Thank you,
Matt
Created 11-29-2023 06:16 AM
Since you said that my proposed solution work for the first part can you accept the solution and then open new ticket for the latest question because its a little different from your first question. Also regarding your requirement in the latest post, I'm having a hard time understanding what you are trying to do and I have the following questions that I hope you can answer or clarify better if you decide to open new ticket:
1- What do you mean by max queue size is 250? how do you set that ? Is it a batch process where each batch you process total of 250 or at a given time you should not have more than 250 flowfiles
2- You say that you want them to Retire in 24 hours, is this for the 250 flowfile ? if that is true then how is this going to work when you say that you want to retire a flowfile every hour for 24?! This is so confusing to me
3- Are you saying that after you retire (completed 24 )all the files in the queue then you want to log a message? Do you mean if all the 250 flowfiles fail and retire? then how this is going to work when there some files succeed and other failed ?
It would help also if you post screenshot of the complete flow and highlight what you want to do in each process and which part of the flow you are having problem with detailing clearly what the problem is related to the target processor (publishkafka in your case ) and what is the expectation
Thanks
 
					
				
				
			
		
