Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Duplicate of flowfile after NiFi primary node re-election

avatar

I have a NiFi job starting with a GetFile processor configured to run on primary node to avoid duplicate flowfiles.

After a NiFi node restart, causing a Primary node re-election, the GetFile processor has created two flowfiles (one expected and one duplicate).

I suppose that the GetFile was configured to run with the pre-election primary node and post-election one too, causing those two flowfiles (both flowfiles were processed by different nodes according to NiFi data provenance).

Is there a way to avoid this behavior, and is it a NiFi bug ?

NB: While the job was running, coordinator and primary nodes were the same.

Thanks a lot.

1 ACCEPTED SOLUTION

avatar
Super Mentor
@Benjamin Bouret

-

It is common for the cluster Coordinator and Primary Node to change from time to time in a NiFi cluster, so you need to careful when designing your flows that utilize processors running "Primary node" only to make sure processing can still continue when a switch occurs.

-

I am going to assume since you got duplicates here that the local directory you have your GetFile processor pointing at is mounted across all your NiFi nodes. In order to avoid duplicates you will need to use processors that support state. The GetFile processor is one of our original processors that was developed before state management was put in place. It has been deprecated in favor of the newer listFile and FetchFile processors. The ListFile processor has the ability to store state either local to each node (not shared for cases where each node is pulling from its own non shared directory) or cluster state (state is stored in zookeeper where same processor on every node has access to it). Cluster state here would allow you to run this processor against a shared mount to all you nodes in "Primary node" only setup. If primary node changes the new primary node will start this processor and pull the last known recorded cluster state before performing a new listing. This should greatly reduce the likelihood of seeing duplicates.

-

NiFi will favor duplicate data over lost data. So there will still exist a small window of opportunity where duplication could occur. For example original primary node ingested data but some network issue for example prevented last state to be written to zookeeper. The new node would then not get the most current state which may result in duplication.

-

The list/fetch processor model also allows you to spread the workload across your cluster more easily. A flow would consist of:

listFile (Scheduled primary node only) --> Remote Process group (configured to point back at cluster to redistributed listed files) ---> fetchFile ( running on all nodes to retrieve content of listed files) --> rest of flow...

-

Thanks,

Matt

-

If you found this answer addressed your question, please take a moment to login to the forum and click "accept" on the answer.

View solution in original post

2 REPLIES 2

avatar
Super Mentor
@Benjamin Bouret

-

It is common for the cluster Coordinator and Primary Node to change from time to time in a NiFi cluster, so you need to careful when designing your flows that utilize processors running "Primary node" only to make sure processing can still continue when a switch occurs.

-

I am going to assume since you got duplicates here that the local directory you have your GetFile processor pointing at is mounted across all your NiFi nodes. In order to avoid duplicates you will need to use processors that support state. The GetFile processor is one of our original processors that was developed before state management was put in place. It has been deprecated in favor of the newer listFile and FetchFile processors. The ListFile processor has the ability to store state either local to each node (not shared for cases where each node is pulling from its own non shared directory) or cluster state (state is stored in zookeeper where same processor on every node has access to it). Cluster state here would allow you to run this processor against a shared mount to all you nodes in "Primary node" only setup. If primary node changes the new primary node will start this processor and pull the last known recorded cluster state before performing a new listing. This should greatly reduce the likelihood of seeing duplicates.

-

NiFi will favor duplicate data over lost data. So there will still exist a small window of opportunity where duplication could occur. For example original primary node ingested data but some network issue for example prevented last state to be written to zookeeper. The new node would then not get the most current state which may result in duplication.

-

The list/fetch processor model also allows you to spread the workload across your cluster more easily. A flow would consist of:

listFile (Scheduled primary node only) --> Remote Process group (configured to point back at cluster to redistributed listed files) ---> fetchFile ( running on all nodes to retrieve content of listed files) --> rest of flow...

-

Thanks,

Matt

-

If you found this answer addressed your question, please take a moment to login to the forum and click "accept" on the answer.

avatar
Explorer

@mattclarke

I'm having the same problem using "ScrollElasticSearchHttp" processor.

Processor state shows one or more nodes of cluster, depending on situation, even when I've configured to "Primary Node only".

Flowfiles have been duplicated on each added node.

How can I solve the problem?