Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Role of primary node

avatar
Expert Contributor

I came across this:

Starting with the NiFi 1.0 release, a Zero-Leader Clustering paradigm is employed. Each node in a NiFi cluster performs the same tasks on the data, but each operates on a different set of data. Apache ZooKeeper elects a single node as the Cluster Coordinator, and failover is handled automatically by ZooKeeper. All cluster nodes report heartbeat and status information to the Cluster Coordinator. The Cluster Coordinator is responsible for disconnecting and connecting nodes. Additionally, every cluster has one Primary Node, also elected by ZooKeeper.

If nifi follows zero-leader paradigm, then what is the purpose of primary node? And how does it differ from other nodes?

1 ACCEPTED SOLUTION

avatar
Super Mentor

@manishg 

The elected cluster coordinator (elected by Zookeeper) is responsible for receiving and processing heartbeats from other nodes in the cluster. It handles the connecting, reconnecting, and manual disconnecting of NiFi nodes.    The Cluster coordinator is also responsible for replicating user request to all nodes in the cluster and get confirmation from those nodes that the request was completed successfully.  

Assume a 3 node cluster with following:
node1 - elected cluster coordinator
node2 - elected primary node
node3 

Role of Cluster Coordinator:
A user can access the NiFi cluster via any of the 3 node's URL.  So lets say a user logs in node3's UI.  When that user interacts with node 3 UI that request is proxied to the currently elected cluster coordinator node that in turn replicates the request all 3 nodes (example: add a processor, configure a processor, empty a queue, etc...).  If one of the nodes were to fail to complete the request, that node would get disconnected.  I may attempt to auto-reconnect later (In newer version of NiFi a connecting node can inherit the clusters flow and replace it local flow only if doing so would not result in dataloss.


Role of the Primary Node:
The elected primary node is responsible for scheduling the execution of any NiF component processor on the canvas that is configured for primary node only.  This is configured in a processor's configuration "scheduling" tab:

MattWho_0-1714749409775.png

MattWho_1-1714750242158.png

Primary node scheduled processors with display a "P" in the upper left corner as seen above.

NOTE: ONLY processors with no inbound connections should ever be set to "primary node" execution.  Doing so on processor with inbound connection can lead to FlowFiles becoming stuck in those connection when the elected primary node changes.

Not all protocols are "cluster" friendly, so primary node execution helps dataflow designers work around that limitation while still benefiting from having a multi-node cluster.  NiFi has numerous "List<XYZ>" and "Fetch<XYZ>" type processor typically used to handle non cluster friendly protocols.    I'll use ListFile and FetchFile as an example.  Let say our 3 node cluster as the sane network directory mounted to every node.  If I was to add the ListFile processor and leave it configured with "all nodes" execution and configure it to list files on that shared mount.  All three nodes in the NiFi cluster would produce FlowFiles for all the files listed (so you have files in triplicate).  Now if i were to configure my ListFile with "primary node" execution, the listFile would only get scheduled to execute on the currently elected primary node (these processor also record cluster state in ZK so that if a elected primary node changes it doe snot result in a re-listing of the same files again).  To prevent overloading the primary node, the list based processors do not retrieve the source content.  It only creates a 0 byte FlowFile with attributes/metadata about the source file.   So the List based processor would then be connected downstream to its corresponding FetchFile processor.  The FetchFile for example would the use the metadata from the 0 byte FlowFile to fetch the content and add it to the FlowFile.  On the connection between ListFile and FetchFile you would configure cluster load balancing. 

MattWho_3-1714750357183.png

Here you can see I selected basic round robin.  You'll notice a connection with load balancing configured will also render a bit different:

MattWho_4-1714750452437.png

What happen on this connection is that all the 0 bytes FlowFiles will be redistributed in round robin style to all connected nodes.  Then on each node the FetchFile will get each nodes subset of FlowFiles content.  This reduce need to transmit content of network between nodes and reduces disk IO on primary node since it is not fetching all the content.  

If you search the Apache NiFi documentation you will see many list and fetch combination type processors.  But any source processor (one with no inbound connection) could be configured for primary node only.  But only schedule a source processor as primary node execution if required.  Doing so on processors like ConsumeKafka for example that uses cluster friendly protocols would just impact performance.

Hope this answers your question only what the difference is between Cluster Coordinator and Primary Node roles in a NiFi cluster.

Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped.

Thank you,
Matt

View solution in original post

1 REPLY 1

avatar
Super Mentor

@manishg 

The elected cluster coordinator (elected by Zookeeper) is responsible for receiving and processing heartbeats from other nodes in the cluster. It handles the connecting, reconnecting, and manual disconnecting of NiFi nodes.    The Cluster coordinator is also responsible for replicating user request to all nodes in the cluster and get confirmation from those nodes that the request was completed successfully.  

Assume a 3 node cluster with following:
node1 - elected cluster coordinator
node2 - elected primary node
node3 

Role of Cluster Coordinator:
A user can access the NiFi cluster via any of the 3 node's URL.  So lets say a user logs in node3's UI.  When that user interacts with node 3 UI that request is proxied to the currently elected cluster coordinator node that in turn replicates the request all 3 nodes (example: add a processor, configure a processor, empty a queue, etc...).  If one of the nodes were to fail to complete the request, that node would get disconnected.  I may attempt to auto-reconnect later (In newer version of NiFi a connecting node can inherit the clusters flow and replace it local flow only if doing so would not result in dataloss.


Role of the Primary Node:
The elected primary node is responsible for scheduling the execution of any NiF component processor on the canvas that is configured for primary node only.  This is configured in a processor's configuration "scheduling" tab:

MattWho_0-1714749409775.png

MattWho_1-1714750242158.png

Primary node scheduled processors with display a "P" in the upper left corner as seen above.

NOTE: ONLY processors with no inbound connections should ever be set to "primary node" execution.  Doing so on processor with inbound connection can lead to FlowFiles becoming stuck in those connection when the elected primary node changes.

Not all protocols are "cluster" friendly, so primary node execution helps dataflow designers work around that limitation while still benefiting from having a multi-node cluster.  NiFi has numerous "List<XYZ>" and "Fetch<XYZ>" type processor typically used to handle non cluster friendly protocols.    I'll use ListFile and FetchFile as an example.  Let say our 3 node cluster as the sane network directory mounted to every node.  If I was to add the ListFile processor and leave it configured with "all nodes" execution and configure it to list files on that shared mount.  All three nodes in the NiFi cluster would produce FlowFiles for all the files listed (so you have files in triplicate).  Now if i were to configure my ListFile with "primary node" execution, the listFile would only get scheduled to execute on the currently elected primary node (these processor also record cluster state in ZK so that if a elected primary node changes it doe snot result in a re-listing of the same files again).  To prevent overloading the primary node, the list based processors do not retrieve the source content.  It only creates a 0 byte FlowFile with attributes/metadata about the source file.   So the List based processor would then be connected downstream to its corresponding FetchFile processor.  The FetchFile for example would the use the metadata from the 0 byte FlowFile to fetch the content and add it to the FlowFile.  On the connection between ListFile and FetchFile you would configure cluster load balancing. 

MattWho_3-1714750357183.png

Here you can see I selected basic round robin.  You'll notice a connection with load balancing configured will also render a bit different:

MattWho_4-1714750452437.png

What happen on this connection is that all the 0 bytes FlowFiles will be redistributed in round robin style to all connected nodes.  Then on each node the FetchFile will get each nodes subset of FlowFiles content.  This reduce need to transmit content of network between nodes and reduces disk IO on primary node since it is not fetching all the content.  

If you search the Apache NiFi documentation you will see many list and fetch combination type processors.  But any source processor (one with no inbound connection) could be configured for primary node only.  But only schedule a source processor as primary node execution if required.  Doing so on processors like ConsumeKafka for example that uses cluster friendly protocols would just impact performance.

Hope this answers your question only what the difference is between Cluster Coordinator and Primary Node roles in a NiFi cluster.

Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped.

Thank you,
Matt