Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Nifi Cluster with Remote Process Group

avatar
Contributor

I am working with nifi cluster. As in tutorial it is suggestion to use remote process group with cluster for load balancing.

But to use Remote process group, we have to provide url of other nifi instance which is hardcoded. So what will happen if that nifi instance goes down in between of workflow execution. Is remote process call is working in that situation as load balancing concept is implemented in Remote process call.

1 ACCEPTED SOLUTION

avatar
Super Mentor
@Gaurav Jain

The URL provided when adding the Remote Process Group (RPG) to your canvas must be successful only when initially added. Once a successful connection is established the target instance will return a list of currently connected cluster nodes. The source instance with the RPG will record those hosts in peer files. From that point forward the RPG constantly updates the list of available nodes and will not only load-balance to those nodes but will also use anyone of them to get an updated status. Lets assume your source instance of NiFi has trouble getting a status update from any of the nodes, it will still attempt to load-balance with failover delivery of data to the last known set of nodes until communication is successful in getting an updated list.

In addition, NiFi will also allow you to specify multiple URLs in the RPG when you create it. Simply provide a comma separated list of URLS for the nodes in the same target cluster. This does not change how the RPG works. It will still constantly retrieve a new listing of available nodes. This allows the target cluster to scale up or down without affecting your Site-To-Site (S2S) functionality.

Thanks,

Matt

View solution in original post

11 REPLIES 11

avatar
Super Mentor
@Gaurav Jain

The URL provided when adding the Remote Process Group (RPG) to your canvas must be successful only when initially added. Once a successful connection is established the target instance will return a list of currently connected cluster nodes. The source instance with the RPG will record those hosts in peer files. From that point forward the RPG constantly updates the list of available nodes and will not only load-balance to those nodes but will also use anyone of them to get an updated status. Lets assume your source instance of NiFi has trouble getting a status update from any of the nodes, it will still attempt to load-balance with failover delivery of data to the last known set of nodes until communication is successful in getting an updated list.

In addition, NiFi will also allow you to specify multiple URLs in the RPG when you create it. Simply provide a comma separated list of URLS for the nodes in the same target cluster. This does not change how the RPG works. It will still constantly retrieve a new listing of available nodes. This allows the target cluster to scale up or down without affecting your Site-To-Site (S2S) functionality.

Thanks,

Matt

avatar
Super Mentor

@Gaurav Jain

Each node in a cluster is responsible for working on its own FlowFiles. Each node is unaware of what FlowFiles other nodes are working on. If a NiFi processor component is working on a FlowFile at the time the Node goes down, the transformation work will start over once that the node is running again. A node disconnecting will not cause processing of FlowFiles to stop on the disconnected node.

Processors that do transformation of FlowFile content will produce a new FlowFile once the transformation is complete. So if failure exists mid processing, the original remains on the incoming queue to the processor and the intermediate work is lost. This is how NiFi ensures no data loss occurs in unexpected failures.

That being said Data plane High Availability (HA) is one of NiFi's roadmap items.

Thanks,

Matt

avatar
Contributor

flow.pngThanks Matt,

All above steps are helpful for me to setup cluster.

I attach my workflow. Workflow is configured so that GetSFTP is executed on primary node and after that there is Remote process group.

By GetSFTP all file contents are put on primary node local content repository.

So when flow reaches to remote process group,it will distribute flowfiles to nodes connected in cluster.

As each node read contents from their local content repository, so in case of RPG , Is contents are again written over nodes local contents repository? If so, then suppose a case where GetSFTP gets multiple file having 2tb total size and write all contents in primary node content repo. After that RPG distribute flowfiles among different nodes, and again it will write contents in their respective repo. So this will be overhead of space as well as time.

avatar
Super Mentor

@Gaurav Jain

This is the exact use case for why GetSFTP was deprecated in favor of listSFTP and FetchSFTP processors.

The ListSFTP processor would run on the primary node only. It produces one 0 byte FlowFile for every file in the listing. All these 0 byte FlowFiles are then sent to a RPG for distribution across cluster. The distributed files are then fed to a FetchSFTP processor that will retrieve the content form the SFTP server and insert it in to the FlowFile at that time. This model eliminates the overhead on the Primary node since it does not need to write the content and it reduces network overhead between nodes since their is no content being send in FlowFiles via the RPG.

The only issue you are going to run in to is:

https://issues.apache.org/jira/browse/NIFI-1202

This issue is addressed in Apache NiFi 1.2.0 which was just released this week.

It will also be addressed in HDF 3.0 which will be released soon.

You can work around the issue in older versions by setting a small object backpressure threshold on the connection feeding your RPG. Since this backpressure is a soft limit, you need to put a processor between your listSFTP processor and the RPG that only processes FlowFiles one at a time. I recommend RouteOnAttribute (no configuration needed on processor, simply route the one existing "unmatched" relationship to the RPG and set back pressure on that connection).

Thanks,

Matt

avatar
Contributor

Hi Matt,

Can you briefly explain what this line exactly mean "The distributed files are then fed to a FetchSFTP processor that will retrieve the content form the SFTP server and insert it in to the FlowFile at that time"

avatar
Super Mentor

@Gaurav Jain

Here is an article i wrote awhile ago that explains the differences between using GetSFTP processor or List and Fetch SFTP processors:

https://community.hortonworks.com/articles/97773/how-to-retrieve-files-from-a-sftp-server-using-nif....

Thanks,

Matt

avatar
Contributor

@Matt Clarke

Thanks Matt

avatar
Contributor

@Matt Clarke

If one flowfile is acquire by any processor, then all the processing is done by that processor only or there is an option to distribute it on other nodes, if working in an cluster.

avatar
Super Mentor

@Gaurav Jain

When you find an answer in Hortonworks Community Connections (HCC) that addresses your question, please accept that answer so that other HCC users know what worked for you.

Thank you kindly,

Matt