Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Apache Nifi PublishKafka Retry Mechanism in case of failure

avatar
New Contributor

I want to do error handling of publish kafka processor in case of kafka goes down in apache nifi by retry mechanism. Retry mechanism needs to be implemented in separate flow(remove load from main flow).

Nifi version = 1.18.0

So, in case of failure of publishKafka processor should pass flow file in the failure relationship. Which is working properly in case of low count of incoming flow files.

Example- Initial image generated few flow files for failure case

NIFIUSER_0-1719574414874.png

Final image after some time Flow files successfully directed to failure relationship-

NIFIUSER_1-1719574445780.png

As the number of incoming flow files increases the flow files are not able to routed to the failure relationship by publishKafka processor.(which is the real life scenario, if kafka goes down then publishKafka processor will apply back pressure to incoming flow files.)

Here is my publishKafka processor configs

NIFIUSER_2-1719574473227.png

I tried using inbuilt retry mechanism of nifi processor

NIFIUSER_3-1719574491703.png

But still the issue is not resolved, publish kafka processor not forwarding flow files in failure relationship in case of error.

5 REPLIES 5

avatar
Community Manager

@NIFI-USER Welcome to the Cloudera Community!

To help you get the best possible solution, I have tagged our NiFi experts @steven-matison @SAMSAL  who may be able to assist you further.

Please keep us updated on your post, and we hope you find a satisfactory solution to your query.


Regards,

Diana Torres,
Community Moderator


Was your question answered? Make sure to mark the answer as the accepted solution.
If you find a reply useful, say thanks by clicking on the thumbs up button.
Learn more about the Cloudera Community:

avatar
Master Mentor

@NIFI-USER 

I am trying to fully understand your use case.

So you have a primary Kafka and a backup Kafka for failover?
This feels like an odd setup to me.

So you are trying to plan for a use case where the entire primary Kafka cluster is down and NiFi will failover to sending publishing to another entirely different Kafka cluster?

Option 1:
I noticed that when you tried to use "retry" on the "failure" relationship, you configured it to use "Yield".  When a FlowFile fails, it is then in this case yields the processor execution for 1 sec (default yield duration from settings tab) and the FlowFile remains in priority slot 1.  So after yield duration that same FlowFile is attempted again. If it fails again a yield of 1 minute and 1 sec is applied to processor before FlowFile is processed again, and then on next retry that gets doubled to 2 mins and 2 secs, then doubled to 4 mins 4 secs, etc.  With the yield retry policy, it prevents processing of the next FlowFile in the inbound connection until the first has failed the configured number of retries.  This can really slow the process of moving FlowFiles to the connection containing the "failure" relationship. 

Option 2:
Instead you could try the "penalize" retry policy.  This policy applies a penalty time to the FlowFile when it fails and leaves it in inbound connection queue.  It does not yield the processor.  So the processor continues to get scheduled to execute.  Any "penalized" FlowFiles are ignored until penalty duration expires.  The default penalty duration is 30 secs configured on the settings tab (you may want to reduce this value for your use case.). It also doubles the length of penalization with each subsequent retry.  After the configured number of retries the FlowFiles is moved to the outbound connection containing the failure relationship.  

Both the above strategies allow for unexpected failures to still have an opportunity to succeed to your primary Kafka with some delay.

Option 3:
The alternative is to stop using the "retry" capability on the failure relationship.  This means that every FlowFile that fails will immediately fail, get penalized and transferred to the "failure" relationship.  So in scenario where this is only a temp unexpected failure you'll have some FlowFiles going to your backup Kafka cluster. and other still going to yoru primary Kafka cluster.  FlowFiles routed to failure are automatically penalized also. Since you are not looping the failure relationship's connection back on the source processor, you may also want to set penalty duration to 0 sec in the settings tab so the that downstream secondary Kafka cluster processor will be able to execute on the FlowFile immediately instead of needing to wait for that penalty to expire.

No matter which option above you choose to use, it is important to adjust the penalty and or yield duration settings to meet your use case needs.

Resources:

Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped.

Thank you,
Matt

avatar
New Contributor

@MattWho  thanks for the reply 🙂.

Yes, I am having two different kafka clusters(i.e. apk kakfa cluster and dap kafka cluster). I am doing error handling of apk kafka cluster in case it goes down, by sending the failed flow files in a topic of dap kafka.

In case of failure, I tried out few other things and found out a trend. If apk kafka cluster goes down then nifi tries to make connection to the apk kafka cluster in interval of 5000ms as shown in below picture.

NIFIUSER_0-1719893796706.png

One more thing I observed if I had incoming "x" flow files to publishKafka processor then it somehow group up the flowfiles and then takes 5*x seconds to go to the failure queue.

Here is the photo to prove above mentioned statement, as you can see on the right hand side in the publishKafka processor incoming files are 50 and the processor is showing Tasks/Time = 1/00:04:10...    which is 4 mins and 10 secs = 250secs. 

NIFIUSER_1-1719893968172.png

And in case of huge number of incoming flowfiles then the max clubed size of flowfiles is 500 and rest of the flow files stay waiting in the incoming queue of publishkafka processor.

Below, image showing Tasks/TIme = 41mins 40 secs = 500*5 = 2500secs for the first batch of 500 flowfiles.

NIFIUSER_2-1719894195951.png

Similarly, for second group of 500 flowfiles, taking same time to go into failure(i.e. 2500secs).

NIFIUSER_3-1719894202296.png

Also, If I give number of retries greater then 0. In this case, still the interval remains the same(i.e. 5 secs) and clubing up of flow files also happens. Let's say number of retries is "n" then it would be taking (n+1)*5*x seconds to go into the failure queue. (x is number of incoming flow files.)

avatar
Master Mentor

@NIFI-USER 

Are you seeing same behavior even when not using retry strategy of "yield"?
What about when retry is not checked?  FlowFiles, upon failure, should immediately be transferred to the connection containing the failure relationship.

What are your penalty and yield settings set to on your PublishKafkaRecord_1_0?
What version is your target Kafka (you are using a rather old Kafka client version 1.0)?

As far as your Kafka topic goes, how many partitions on the topic?
How many concurrent tasks set on PublishKafkaRecord?
How many nodes in your NiFi cluster?

Thanks,
Matt

avatar
New Contributor

Yes, almost same behavior is observed with retry strategy as "penalize". Just the additional penalty duration gets added into the time.

For example by default the penalty duration is 30 secs, if incoming flow files are 10 and number of retries is 1. For this case 10 flow files are clubbed up and first retry happens at 50secs. Then for 30secs it penalizes the clubbed flow files. Then after 50secs it goes into the failure relationship.

So, In total (numberOfRetries+1)*5secs*(numberOfInComingFlowFiles) + Penalty duration time taken by publishKafka processor to route file into failure relationship in case of penalize retry policy.

 

If retry is not checked then similar behavior like yield is observed 5*numberOfIncomingFlowFiles secs to route to failure relationship as shown in photos.

NIFIUSER_0-1719987759560.png

NIFIUSER_1-1719987824903.png

Penalty and yield settings are default only.

NIFIUSER_3-1719988723011.png

target kafka version is  3.4.0 and number of partition is 1.

Number of nifi nodes are 3.

Number of concurrent Tasks on PublishKafkaRecord is 1, but the execution is on all nodes, which is I think 1 thread on 3 nodes each.

NIFIUSER_2-1719988526202.png