Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Nifi consume duplicate message problem from Kafka

avatar
Contributor

Nifi version is 1.23.2 and kafka version 2.6.I use ConsumeKafka_1_0 1.23.2 to read messages from kafka.DetectDuplicate 1.23.2 processor is available, which deduplicates stream files on the same stream.However, stream read the same recording twice  and DetectDuplicate processor could not deduplicate the same message.I understand that it reads the same message because the offset  id value is same in two message.What could be the reason why the ConsumeKafka_1_0  processor reads the same message and DetectDuplicate  processor cannot detect same message?

 

plapla_0-1705994944172.png

 

plapla_1-1705995855058.png

plapla_2-1705995906341.png

 

 

10 REPLIES 10

avatar
Super Mentor

@plapla 

Couple things here:

1. You should be using the ConsumeKafka processor that matches your Kafka server version.  If your KafKa server is 2.6, you should be using the ConsumeKafka_2_6 processor.

2. In your detectDuplicate processor you are using ${key}.  How is this Attribute beig created on each FlowFile and where is it's value derived from?

Thanks,
Matt

avatar
Contributor

Thanks for your return @MattWho 

1.I confirmed the kafka version,kafka version is 2.5.In this case, which version of the processor should I use?

2.transform the content to the key attribute with the

EvaluateJsonPath processor.Then I use the key value in the detectDuplicate processor as I share below.I confirmed that the attribute of the flowfile that cannot be deduplicated is.Could this be related to the flowfile arriving at the same time?
 
plapla_0-1706080906260.pngplapla_1-1706080942033.png

 

avatar
Contributor

Hi @MattWho ,

Do you have any comments on the subject?

avatar
Super Mentor

@plapla 

Since Apache NiFi does not have a ConsumeKafka processor build with the Kafka 2.6 client, i would recommend going with the client closest to but not newer than the Kafka server version you are using.  In this case the ConsumeKafka_2_0 processor.

1. Is your NiFi a standalone NiFi instance install or a multi-node NiFi cluster setup?
2. How are your distributedMapCacheClient and DistributedMapCacheServer controller services configured?
3. What is the rate of FlowFiles being produced by your consumeKafka processor?

I see that you configured the DetectDuplicate processor with an age off of 420 days; however, DistributedMapCache server has a configured max cache entries with a default of 10,000.  So possibly due to volume cache entries are being removed from the cache server resulting in issues detecting duplicates.  The DistributedMapCache also holds all cache entries in NiFi heap memory and is not a good cache server to use in high volume caches (because of heap usage).  DistributedMapCache also offers no high availability.    This is becomes even more of an issue with a NiFi cluster.  You would be better off using an external map cache server.

If you are using a NiFi cluster, make sure your DistributedMapCacheClient is configured to connect to one specific DistirbutedMapCacheServer.  I have seen misuse here where individuals configured it to connect to local host or each NiFi node to its own host's map cache server.  The Map cache servers running on each node do not share data.

Hope this helps you...

If you found any of the suggestions/solutions provided helped you with your issue, please take a moment to login and click "Accept as Solution" on one or more of them that helped.

Thank you,
Matt

avatar
Contributor

Hi @MattWho ,

I updated the processor to ConsumeKafka_2_0.

1.My Nifi is not clustered.It consists of a single node.

2.distributedMapCacheClient and DistributedMapCacheServer as I share below;

 

plapla_0-1706624669008.png

 

plapla_1-1706624707586.png

3.ConsumeKafka processor reads approximately 200 flowFiles in 5 minutes

I configured the DetectDuplicate processor with an age off of 10 days.It doesn't matter if the processor can't deduplicate messages.It's bad for the ConsumeKafka to read the same message twice.

Finally,I updated 10000 to 200 value on max poll records attributes ConsumeKafka.

 

 

 

avatar
Super Mentor

@plapla 
The consumeKafka processor should not be reading the same message twice.  The processor should be maintaining local state (since you are not clustered) in NiFi's local state directory. Make sure that you are not having disk space or permissions issues that may prevent the writing of that local state.  You can write click on the ConsumeKafka processor to view the current stored state.

The consumeKafka processor creates a consumer group using the GroupID configured in the processor, so make sure you do not have multiple consumeKafka processor consuming from the same Kafka Topic using the same Group ID.  For optimal performance the number of concurrent tasks configured on the consumeKafka processor should match the number fo partitions on target topic.

Do you see any Kafka rebalance going on?   Will happen when you have more consumers than partitions in a the consumer group that is consuming from that topic.  A rebalance can affect the commit of the offset resulting in possible data duplication.

Thanks,

Matt

avatar
Contributor

@MattWho 

The reason for this situation is not because the Consume processor reads the same message twice or because the detect duplicate processor cannot capture the same message.It is caused by the putElasticsearchHttp processor adding data to the elastic cluster  twice.The putElasticsearchHttp  processor has received a timeout error and the flowFile goes to the failure transition.When the putElasticsearchHttp   processor receives a successful response, flowFile is deleted, but it is recorded twice in the elastic. environment.What could be the reason for this?

avatar
Super Mentor

@plapla 

This sounds like the putElasticSearchHTTP processor is working as designed.  It is putting to ElasticSearch over HTTP and ElasticSearch is successfully processing that request; however your ElasticSearch is not responding to the original http request before the timeout has occurred.  As a result, putElasticSearchHTTP has routed to failure. The question here is what are you doing with the failure relationship?  If you configured "retry" or looped the failure relationship via a connection back on the putElasticSearchHTTP processor, and the same FlowFile would be processed a second time.

You may be able to solve this by simply increasing the configured "Response Timeout" configuration on the putElasticSearchHTTP processor.   But you may also want to look at the particular files that encounter this issue and see if their are any consistencies across them such as larger sizes, time of day, load on ElasticSearch at time, number of concurrent pending request on ElasticSearch side, network load, etc...

If you found any of the suggestions/solutions provided helped you with your issue, please take a moment to login and click "Accept as Solution" on one or more of them that helped.

Thank you,
Matt

avatar
Contributor

@MattWho 

putElasticsearchHttp processor  connection Timeout parameter is 5 secs.Response timeout parameter is 15 secs.Would it be appropriate if I doubled these values?