Created on 05-05-2017 03:34 PM - edited 08-17-2019 07:10 PM
I have 2 node nifi cluster with similar configuration. I create a simple Flow as
1.Flow
2.getFromHDFS
The listHDFS which creates flow files this is set to run on the primary node. Which sends data to RPG where the address is set to one of the nodes in the cluster. what I understood from the docs is, In clustered mode flow files get distributed on nodes and each node operates on some non-empty set of flow files received from RPG but in my case its always one node which gets 100% of the flowfiles and in few cases the other node get just 1 flow file, this is even true for 1000 files each of 100 MB
1.RPG
2. getHDFS grp (supposed to be run on each node)
How can I get getHDFS run in parallel on both the machines operating on different set of flow files?
.
Created 05-05-2017 04:04 PM
When the Remote Process Group was originally written, the list and fetch type processors did not exist and NiFi was primarily being used to process large files. Based off that, the design used worked pretty well for load-balancing.
Here is how it works:
1. RPG connects to target NiFi cluster.
2. Target NiFi returns a list of available peer nodes along with their current load.
3. RPG creates a distribution allocation. (You will see in logs that distribution in the from of percentages. Node 1 50%, Node2 50%).
4. RPG connects to node 1 and sends data from incoming queue for 5 seconds. then connects to node 2 and sends for 5 seconds.
Lets assume a distribution las follows:
Target is 4 node cluster and base on load distribution, the following distribution was calculated:
Node 1 -- 10%
Node 2 -- 20%
Node 3 -- 20%
Node 4 -- 50%
As a result the RPG would connect in this pattern to send the data:
Node1, Node2, node3, node4, node2, node3, node4, node4, node4, node4.
While with a high volume dataflow and a dataflow dealing with larger files this balances out nicely over time. A low volume flow or a flow dealing with very small files, it does not work so well.
In your case, you have a bunch of small files (all 0 byte), so in 5 seconds every one of them is send to node 1.
There are improvements coming to the RPG to set batch sizes and improve how the distribution of data occurs.
Thank you,
Matt
Created 05-05-2017 04:04 PM
When the Remote Process Group was originally written, the list and fetch type processors did not exist and NiFi was primarily being used to process large files. Based off that, the design used worked pretty well for load-balancing.
Here is how it works:
1. RPG connects to target NiFi cluster.
2. Target NiFi returns a list of available peer nodes along with their current load.
3. RPG creates a distribution allocation. (You will see in logs that distribution in the from of percentages. Node 1 50%, Node2 50%).
4. RPG connects to node 1 and sends data from incoming queue for 5 seconds. then connects to node 2 and sends for 5 seconds.
Lets assume a distribution las follows:
Target is 4 node cluster and base on load distribution, the following distribution was calculated:
Node 1 -- 10%
Node 2 -- 20%
Node 3 -- 20%
Node 4 -- 50%
As a result the RPG would connect in this pattern to send the data:
Node1, Node2, node3, node4, node2, node3, node4, node4, node4, node4.
While with a high volume dataflow and a dataflow dealing with larger files this balances out nicely over time. A low volume flow or a flow dealing with very small files, it does not work so well.
In your case, you have a bunch of small files (all 0 byte), so in 5 seconds every one of them is send to node 1.
There are improvements coming to the RPG to set batch sizes and improve how the distribution of data occurs.
Thank you,
Matt
Created on 05-05-2017 04:14 PM - edited 08-17-2019 07:10 PM
For now you can force a better load distribution is such case by doing the following for your 0 byte low volume flow:
No need to change any configuration on the RouteOnAttribute processor. Simply connect the pre-existing "unmatched" relationship to your RPG.
Thanks,
Matt
Created 05-08-2017 10:29 AM
This worked, it distributed files across nodes but why do we need route-on-attribute to set back pressure ?. I directly connected list-hdfs connected to RPG it and set back pressure to 5 and flow files were distributed across nodes
Created 05-08-2017 12:07 PM
Backpressure thresholds are soft limits and some processors do batch processing. The listHDFS processor will produce a list of Files from HDFS and produce a single 0 byte FlowFile for each file in that list. It will then commit all those FlowFiles to the success relationship at once. So if back pressure threshold was set to 5, the ListHDFS processor would still dump all FlowFiles on to it. (even if the listing consisted of 1000s of Files). At that point backpressure would be applied and prevent the listHDFS form running again until the queue dropped back below 5, but this is not the behavior you need here. The RouteOnAttribute processor is one of those processors that works on 1 FlowFile at a time. This allows us to more strictly adhere to the back pressure setting of 5 on its unmatched relationship. The fact that I used a RouteOnAttribute processor is not important, any processor that works on FlowFiles one at a time would work. I picked RouteOnAttribute because it operates off of FlowFile Attributes which live in heap memory which makes processing here very fast.
Thanks,
Matt
Created 05-05-2017 04:31 PM
The ability to set FlowFile batch size is coming in Apache NiFi 1.2.0 which should be up for vote any day now.
https://issues.apache.org/jira/browse/NIFI-1202
Thanks,
Matt
Created 05-05-2017 05:07 PM
Thanks, matt this is really helpful.
I tried a flow where "generate flow file proc" would generate 100 files each of 1gb which is then connected to RPG but the RPG graphs shows both node 1 and node 2 but the files end up always in the node1. I disconnected node1 and the part of the flow which supposed to run in parallel stopped.At the connection between generate flow file and RPG flowfiles were accumulated of 80 GB even when node2 was up it did not take any files from the the queue . When node1 was again connected it consuming flowfiles from the queue
Created 05-05-2017 05:51 PM
Nodes in a nifi cluster really don't know about each other. They find out from zookeeper who the elected cluster coordinator is and start sending health and status messages via heartbeats. Each node in the cluster runs its own copy of the dataflow and works on its own set of FlowFiles. When a Node goes down other nodes do not pickup FlowFiles from that node an work on them.
I am not following exactly what you did above. If you disconnect a node from the cluster, that heath and status heartbeats will no longer be coming in. Any queued data on that node should not be reflected in the cluster view until the node is reconnected.
If the RPG was not sending FlowFiles to Node 2, there is likely an issue with the connection.
Make sure you correctly configured the S2S properties in the nifi.properties file on both node correctly and that there is no firewall blocking the connection.