Support Questions

Find answers, ask questions, and share your expertise

Why is DetectDuplicate not filtering duplicate ${filename}? Why is ListS3 giving me duplicate messages?

avatar
Explorer

I sure hope someone can help me, because otherwise I'm sunk. I'm using the ListS3 processor to feed a FetchS3 processor, but when I list my test directory, which contains one file, I get 4 messages in my queue instead of one. (I also get 4 messages with the directory name, but I filter that out.) The 4 messages have the same filename and I can't find any reason why this is happening.

I searched the site but can find no mention of duplicate messages (or flows) from the ListS3 processor. Which is odd because it has been so easy to create this condition. Multiple developers encountered it. It wasn't just one flow. I encountered it when creating flows on a Linux server, version 1.6, and on my local Windows 10 machine, version 1.8. Does it every time with various buckets.

If I examine the queue in a connection between processors, I can see the 4 messages and that they all have the same filename. The weirdest thing is that position is the same for each of the 4 messages. When I list the directory, I see 4 messages with position 1 for the directory. And 4 messages with position 2 for the file.

I had wondered if this meant that it really was only 1 message that was displayed in a strange way, but I don't think so. First off, each message has a unique UUID. Second, after the FetchS3 I do a PutSFTP and the first message succeeds and then the next 3 fail with failure to rename dot-file and then failure to delete it when attempting to clean up. So it must be trying to copy the same file 4 times in parallel and the last 3 are locked/conflicted and fail.

It is strange though. I changed the Back Pressure to 1 and saw that 4 messages show up in the queue. I have to start the next processor to get the next group of 4 into the queue. It never stops at just one message.

So OK, I'm assuming the messages are real. I try using the DetectDuplicate processor. It takes me a few tries to get set up, but it eventually says it is ready to go. For Cache Entry Identifier I put ${filename}. Seems pretty straight forward. If any message come in with the same filename before the Age Off Duration has passed, it should be marked as duplicate. But they always go through as non-duplicate. Always.

I tried Cache The Entry Identifier as both true and false. No effect.

I wondered if I had the Distributed Cache Service set up correctly. My first attempt creating that at the global scope said there was a conflict with the port, so after more reading I saw that I could make Services in my processor group scope. In my processor group, I create controller services DistrubutedMapCacheClientService and DistributedMapCacheServer and enable them. I've tried them at the default port 4557. I even tried changing the port on both of them to 4558 to see if that helped. No effect.

Is there something I'm missing? Seems pretty straightforward. Define one of each type on the same port. I thought it was a little strange that I don't tell the two services about each other. I guess just giving them the same port connects them?

I don't understand why I always get 4 messages per file. I even tried setting the global Nifi settings to 1 for both Maximum Timer Driven Thread Count and Maximum Even Driven Thread Count, but it still always ran with a 4 next to the symbol - the symbol in the processor that says it is doing something.

And I don't understand why my DetectDuplicate doesn't filter out the duplicates. It seems quite simple. Put the ${filename} in that field and every message marked non-duplicate should have a unique filename. But I get nothing in the duplicate queue.

Does anyone have any idea what is going on?

1 ACCEPTED SOLUTION

avatar
Master Guru

@Kevin Lahey

Not sure if you are using NiFi cluster (or) not, could you try to run ListS3 processor only on Primary Node only.

As this processor intended to run only on primary node as per documentation.

View solution in original post

6 REPLIES 6

avatar
Master Guru

@Kevin Lahey

Not sure if you are using NiFi cluster (or) not, could you try to run ListS3 processor only on Primary Node only.

As this processor intended to run only on primary node as per documentation.

avatar
Master Mentor

@Kevin Lahey

I completely agree with @Shu. I sounds like you have ListS3 processor executing on all 4 nodes in a NiFi cluster. This results in each NiDi node listing the same filename. This means that each node is then trying to lookup that filename in the distributed cache used by the detectDuplicate processor. This results in a bit of a race condition between you nodes where one or more nodes fails to find entry in cache before 1 of the nodes adds this new filename to that cache.

-

You flow should be running the ListS3 processor with it success relationship feeding a FetchS3 processor. That connection between those two processors should be configured to load balance the listed files across all nodes in cluster.

-

Thanks,

Matt

avatar
Explorer

@Matt Clarke @Shu

Thanks. You and Shu are correct, but I still don't understand why my DetectDuplicate processor didn't work. If you could continue to indulge me, I have a few more questions for you.

** 1.) For testing purposes, I left the ListS3 processor the same and set the DetectDuplicate processor to Primary Node. DetectDuplicate only sent one copy of each file further on, but it simply ignored the other 3 copies of the messages - left them in the queue. So I guess it processed only messages produced on the local server. I had assumed the queues work like Message Queues and any of the nodes could pull off any messages that are in the proceeding queue. So I'm confused.

Is the DetectDuplicate processor different in this regard? Or maybe it has to do with my DistributedMapCacheClientService being set to localhost? Does each node keep its own copy of the Cache?

** 2.) More questions about the DistributedMapCaches. I found a reply from 2016 by Pierre Villard. He said:

It is advised to run the DistributedMapCacheServer on the NCM, then, in DistributedMapCacheClientService, instead of localhost, you can use the IP address of your NCM.

OK. I guess this is saying that I should set up one CacheServer on one node, the NCM, and then point the CacheClientService to the NCM? How do I get the CacheServer on a specific node? Is that what clicking in the upper right corner and selecting Controller Settings does? If I put the CacheServer there it puts it on the NCM? And then I set up the CacheClientService in the Processor Group? I'm just guessing here.

If I put the CacheServer in the Processor Group, will that cause nifi to make separate caches on each node?


** 3.) I guess I should also ask how I figure out which server or IP address is the NCM? Probably set in a config file somewhere. That is probably in the documentation.


** 4.) If I have several different Processor Groups running, do I have to worry about port conflicts between groups? If I set up DistributedMapCacheServers in more than one Processor Group should I set up the CacheServer and ClientServers to use a different port for each processor group? (Of course each pair must have the same port number.) If I set up a CacheSever by clicking in the upper left hand corner and selecting Controller Settings, do I want that to use a port different than any CacheServer I set up in a Processor Group?

Am I barking up the wrong tree and in fact I normally only want to set up one CacheServer for the entire cluster?

*********

I understand that is a lot of questions. I at least tried to be specific to the things I encountered. In any case, I suspect a lot of people might find such answers useful so if you or anyone else could find the time to answer at least some of them, it would be most appreciated.

Thanks again

avatar
Master Mentor

@Kevin Lahey

1. Each NiFi node in a cluster runs its own copy of the flow.xml and processes its own set of FlowFiles. Node are unaware of what FlowFiles exist on other nodes in the cluster.


2. In much older versions of NiFi (Apache 0.x versions), NiFi did not have any High availability at the control level within a cluster. There existed a dedicated NiFi instance known as the NiFi Cluster Manager (NCM). This was the only instance in the NiFi cluster that could be accessed. All the nodes connected to this NCM. If NCM went down the entire NiFi cluster was not reachable. As of Apache NiFi 1.x+ the NCM no longer exists and the cluster relies on Zookeeper to elect one of the cluster nodes to handle role of Cluster Coordinator and Primary node. If the currently elected node(s) for these roles goes down, a new load is elected to these roles. In this way HA at the control level was provided. When you create any component (processor, controller service, reporting task, etc...), those components are replicated to all nodes in the cluster. So yes, the DistributedMapCacheServer controller service would be running on all nodes. If you then configured the DistributedMapCacheClient to use "localhost", then each node would be reading and writing to different cache servers. The DistributedMapCacheClient should be configured to point at a specific node rather than localhost. As you can see you have no HA in this type of setup since you are dependent on that one node hosting the cache server you are using to always be up. Instead you shoudl be using one of the external cache options like HBase in order to have HA.


3. As explained above, there is not such thing as a NCM as of Apache NiFi 1.x+


4. Every component you add to the NiFi canvas is running within a single JVM on each NiFi node. So you cannot configure multiple components that bind to the same configured port anywhere. The first component will bind to port and when the other components are started they will throw an exception about port already in use. You can have as many clients (DistributedMapCache Client) as you like, since they act as a client and do not bind to a port. Only the server binds to the port so it can listen for client requests.


Hope this helps

avatar
Explorer

I think I understood all that. Thanks for taking the time to write it all out.

avatar
Explorer

@Shu

Thanks. That was it. I had assumed that since this was a development instance it was stand alone and just ignored all the mentions of clusters and Primary Nodes in the other posts. A quick call to nifi-api/flow/cluster/summary confirmed this is a cluster.

I do have a few more questions about DetectDuplicate processors which I'll post in reply to Matt Clarkes post. If you could take a look I'd appreciate it.

Thanks again.