Support Questions

Find answers, ask questions, and share your expertise

How to fix data uniqueness for two different nifi flow consuming from same RETRY topics

avatar
Contributor

Hi , 

In My nifi flow i have two dataFlows i want to same same data to two different HTTP endPoints , for both data flow data is being consumed from same kaflka topic (TOPIC A) both flows has different kafka groupID and pushing data to different HPPT end points (endPoint 1 and endPoint 2). If there is any failure data is being produced into retryTopic R which is common for both data flow . There is separate retryData flow for both configured with seperate HttpEndPoints , but both consuming from same retryTopic .

What issue coming is , due to same retry topic , dataFlow A can consume data of dataFlowB . How should i avoid this situation ?

retryTopic used should be same for both.

 

Here is a sample for the dataFlow .RetryNifi.jpg

2 REPLIES 2

avatar
Master Mentor

@Rohit1997jio 

You have topic A with your source messages.  You have two consumer groups each pulling all the messages from Topic A.  While both these dataflows consume all the same messages, each may fail on a different messages with the InvokeHTTP execution.  

You are then writing the FlowFiles that failed invokeHTTP to another topic R which both consumer groups can consume from. So both consumer groups will get a copy of any message written to the topic.  Your dataflow is working exactly as designed. 

You must keep your retry logic independent of one another.  

I also don't understand the overhead of ingesting the same messages twice in your NiFi.

Why not have have a single ConsumeKafka ingesting the messages from the topic and then routing the success relationship from the ConsumeKafka twice (once to InvokeHTTP A and once to InvokeHTTP B)?  

Why publish failed or retry FlowFile messages to an external topic R just so they can be consumed back in to your NiFi?  It would be more efficient to just keep them in NiFi and create a retry loop on each InvokeHTTP.  NiFi even offers retry handling directly on the relationships with in the processor configuration.

If you must write the message out to just one topic R, you'll need to append something to the message that indicates what InvokeHTTP (A or B) failure or retry resulted in it being written to Topic R.  Then have a single Retry dataflow that consume from Topic R, extracts that A or B identifier from message so that it can be routed to the correct invokeHTTP.  Just seems like a lot of unnecessary overhead.

Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped.

Thank you,
Matt

 

avatar
Contributor

Hi @MattWho 

Thanks for your response .

I also don't understand the overhead of ingesting the same messages twice in your NiFi

My requirement is to send data to different end point , so that they can perform different operations on the data.

Why not have have a single ConsumeKafka ingesting the messages from the topic and then routing the success relationship from the ConsumeKafka twice (once to InvokeHTTP A and once to InvokeHTTP B)?  

For me one flow is like one vendor , like this i will be having multiple number of vendors , everyone will have there separate end points. Keeping all in one flow is not possible . So I am creating separate data flow and separate retry logic for them.

This above issue is with only 1 vendor , they require same data (consumed from same kafka topic ) to be pushed to 2 separate endpoints . But I am not able to handle the retry logic for them.

Why publish failed or retry FlowFile messages to an external topic R just so they can be consumed back in to your NiFi?  

yes i want them to be consumed again to nifi . All failed requests iam publishing to retry topic and this is being handled in retry flow . With this iam able to keep my main flow without and failed requests and new requests which does not have any error will get pushed to end point successfully

It would be more efficient to just keep them in NiFi and create a retry loop on each InvokeHTTP.  NiFi even offers retry handling directly on the relationships with in the processor configuration

if i add a retry loop to invoke http and the endpoint is down for a longer time , too many requests will get queued in nifi .

If you must write the message out to just one topic R, you'll need to append something to the message that indicates what InvokeHTTP (A or B) failure or retry resulted in it being written to Topic R.  Then have a single Retry dataflow that consume from Topic R, extracts that A or B identifier from message so that it can be routed to the correct invokeHTTP.  Just seems like a lot of unnecessary overhead.

Please help me with the retry logic . Data is going in same retry topic how can i differentiate between the data , whether it failed from data flow 1 or from data flow 2.