Support Questions

Find answers, ask questions, and share your expertise

Flow runs on single node on a Nifi cluster

avatar

I have 2 node nifi cluster with similar configuration. I create a simple Flow as

1.Flow

15078-capture.png

2.getFromHDFS

15079-gethd.png

The listHDFS which creates flow files this is set to run on the primary node. Which sends data to RPG where the address is set to one of the nodes in the cluster. what I understood from the docs is, In clustered mode flow files get distributed on nodes and each node operates on some non-empty set of flow files received from RPG but in my case its always one node which gets 100% of the flowfiles and in few cases the other node get just 1 flow file, this is even true for 1000 files each of 100 MB

1.RPG

15083-graph.png

2. getHDFS grp (supposed to be run on each node)

15084-flow-file-in.png

How can I get getHDFS run in parallel on both the machines operating on different set of flow files?

.

1 ACCEPTED SOLUTION

avatar
Master Mentor

@ismail patel

When the Remote Process Group was originally written, the list and fetch type processors did not exist and NiFi was primarily being used to process large files. Based off that, the design used worked pretty well for load-balancing.

Here is how it works:

1. RPG connects to target NiFi cluster.

2. Target NiFi returns a list of available peer nodes along with their current load.

3. RPG creates a distribution allocation. (You will see in logs that distribution in the from of percentages. Node 1 50%, Node2 50%).

4. RPG connects to node 1 and sends data from incoming queue for 5 seconds. then connects to node 2 and sends for 5 seconds.

Lets assume a distribution las follows:

Target is 4 node cluster and base on load distribution, the following distribution was calculated:

Node 1 -- 10%

Node 2 -- 20%

Node 3 -- 20%

Node 4 -- 50%

As a result the RPG would connect in this pattern to send the data:

Node1, Node2, node3, node4, node2, node3, node4, node4, node4, node4.

While with a high volume dataflow and a dataflow dealing with larger files this balances out nicely over time. A low volume flow or a flow dealing with very small files, it does not work so well.

In your case, you have a bunch of small files (all 0 byte), so in 5 seconds every one of them is send to node 1.

There are improvements coming to the RPG to set batch sizes and improve how the distribution of data occurs.

Thank you,

Matt

View solution in original post

7 REPLIES 7

avatar
Master Mentor

@ismail patel

When the Remote Process Group was originally written, the list and fetch type processors did not exist and NiFi was primarily being used to process large files. Based off that, the design used worked pretty well for load-balancing.

Here is how it works:

1. RPG connects to target NiFi cluster.

2. Target NiFi returns a list of available peer nodes along with their current load.

3. RPG creates a distribution allocation. (You will see in logs that distribution in the from of percentages. Node 1 50%, Node2 50%).

4. RPG connects to node 1 and sends data from incoming queue for 5 seconds. then connects to node 2 and sends for 5 seconds.

Lets assume a distribution las follows:

Target is 4 node cluster and base on load distribution, the following distribution was calculated:

Node 1 -- 10%

Node 2 -- 20%

Node 3 -- 20%

Node 4 -- 50%

As a result the RPG would connect in this pattern to send the data:

Node1, Node2, node3, node4, node2, node3, node4, node4, node4, node4.

While with a high volume dataflow and a dataflow dealing with larger files this balances out nicely over time. A low volume flow or a flow dealing with very small files, it does not work so well.

In your case, you have a bunch of small files (all 0 byte), so in 5 seconds every one of them is send to node 1.

There are improvements coming to the RPG to set batch sizes and improve how the distribution of data occurs.

Thank you,

Matt

avatar
Master Mentor

@ismail patel

For now you can force a better load distribution is such case by doing the following for your 0 byte low volume flow:

15085-screen-shot-2017-05-05-at-121244-pm.png

No need to change any configuration on the RouteOnAttribute processor. Simply connect the pre-existing "unmatched" relationship to your RPG.

Thanks,

Matt

avatar

This worked, it distributed files across nodes but why do we need route-on-attribute to set back pressure ?. I directly connected list-hdfs connected to RPG it and set back pressure to 5 and flow files were distributed across nodes

avatar
Master Mentor

@ismail patel

Backpressure thresholds are soft limits and some processors do batch processing. The listHDFS processor will produce a list of Files from HDFS and produce a single 0 byte FlowFile for each file in that list. It will then commit all those FlowFiles to the success relationship at once. So if back pressure threshold was set to 5, the ListHDFS processor would still dump all FlowFiles on to it. (even if the listing consisted of 1000s of Files). At that point backpressure would be applied and prevent the listHDFS form running again until the queue dropped back below 5, but this is not the behavior you need here. The RouteOnAttribute processor is one of those processors that works on 1 FlowFile at a time. This allows us to more strictly adhere to the back pressure setting of 5 on its unmatched relationship. The fact that I used a RouteOnAttribute processor is not important, any processor that works on FlowFiles one at a time would work. I picked RouteOnAttribute because it operates off of FlowFile Attributes which live in heap memory which makes processing here very fast.

Thanks,

Matt

avatar
Master Mentor

@ismail patel

The ability to set FlowFile batch size is coming in Apache NiFi 1.2.0 which should be up for vote any day now.

https://issues.apache.org/jira/browse/NIFI-1202

Thanks,

Matt

avatar

Thanks, matt this is really helpful.

I tried a flow where "generate flow file proc" would generate 100 files each of 1gb which is then connected to RPG but the RPG graphs shows both node 1 and node 2 but the files end up always in the node1. I disconnected node1 and the part of the flow which supposed to run in parallel stopped.At the connection between generate flow file and RPG flowfiles were accumulated of 80 GB even when node2 was up it did not take any files from the the queue . When node1 was again connected it consuming flowfiles from the queue

avatar
Master Mentor

@ismail patel

Nodes in a nifi cluster really don't know about each other. They find out from zookeeper who the elected cluster coordinator is and start sending health and status messages via heartbeats. Each node in the cluster runs its own copy of the dataflow and works on its own set of FlowFiles. When a Node goes down other nodes do not pickup FlowFiles from that node an work on them.

I am not following exactly what you did above. If you disconnect a node from the cluster, that heath and status heartbeats will no longer be coming in. Any queued data on that node should not be reflected in the cluster view until the node is reconnected.

If the RPG was not sending FlowFiles to Node 2, there is likely an issue with the connection.

Make sure you correctly configured the S2S properties in the nifi.properties file on both node correctly and that there is no firewall blocking the connection.