Support Questions

Find answers, ask questions, and share your expertise

NiFi: All files are processed by SFTP processor on Primary node

avatar
Expert Contributor

Hi Guys,

I have a flow to ingest files from sftp server, by ListSFTP(primary node) -> FetchSFTP(all nodes).

I uploaded 20 files(5MB each) to sftp server. I found all files are fetched and processed by the primary node.

I was wondering how to make the tasks evenly distributed among all nodes, rather than only primary node.

Is it because 20 files is small number or each file is too small?

Thanks.

1 ACCEPTED SOLUTION

avatar

@Alvin Jin

Since you are on a secure cluster, you need to add policies to authorize the cluster to use S2S. You need to create a policies to authorize your nodes to retrieve S2S details and to receive data via S2S. Look at option 2 of this article to see how to do it https://community.hortonworks.com/content/kbentry/88473/site-to-site-communication-between-secured-h...

Read this article on List/fetch design pattern to have a better understanding of what you are implementing https://pierrevillard.com/2017/02/23/listfetch-pattern-and-remote-process-group-in-apache-nifi/

View solution in original post

7 REPLIES 7

avatar
Master Guru

In order to distribute the fetch across the cluster, you will need ListSFTP -> Remote Process Group, the RPG should send to an Input Port on the same cluster, and that Input Port can be connected to the FetchSFTP. This RPG->Input Port connection will distribute the flow files containing the file names across the cluster, and each node's Input Port will receive a subset of those flow files, which will then be fetched in parallel across the cluster.

avatar
Expert Contributor

Hi @Matt Burgess

Thank you for your response.

I setup RPG and an Input Port.

It seems RPG can connect the remote nifi instance.

However, when I click Manage Remote Ports, it can't display the input port I added.

BTW, I use a secured 3 node cluster with S2S enabled.

Please see the attached screenshot.

Did I miss anything?

Thanks.

39842-rpg.png

avatar

@Alvin Jin

Since you are on a secure cluster, you need to add policies to authorize the cluster to use S2S. You need to create a policies to authorize your nodes to retrieve S2S details and to receive data via S2S. Look at option 2 of this article to see how to do it https://community.hortonworks.com/content/kbentry/88473/site-to-site-communication-between-secured-h...

Read this article on List/fetch design pattern to have a better understanding of what you are implementing https://pierrevillard.com/2017/02/23/listfetch-pattern-and-remote-process-group-in-apache-nifi/

avatar
Expert Contributor

Hi @Abdelkrim Hadjidj

Thank you for your response. Very helpful.

I do have retrieve site-to-site details policy set for all nodes in the cluster.
And the RPG can connect and refresh.

However, the "Retrieve Data via site-to-site" item is grey and disabled for the Input Port.

Then, I found below document.

https://community.hortonworks.com/questions/67255/how-can-i-set-an-inputoutput-ports-for-a-remote-pr...

It says Input Port must be in the root level to enable S2S.

I was wondering is it a limitation?

How about I want to put my entire flow in a processor group?

BTW, after S2S is enabled. I still find the file distribution is not even. One node processes more than the others.

Is there a way to control the distribution?

Thanks.

39865-s2s.jpg

avatar

Hi @Alvin Jin

Happy that this information was helpful.

Yes, you need to add your input port at the root canvas. If you need to work with process group, you still need to have an input port at root, then add an input port in your process group, then connect your root input port with process group:

41432-screen-shot-2017-10-19-at-93618-pm.png

If you have recursive process groups you should do the same thing to connect your external input port until your internal process group.

Regarding data distribution, you should know that S2S do batching of data to have better performance. So it tries to send maximum data to one node before moving to the next one. This is very efficient. But if you have few flows (when testing for example) all flow files will go to a single node.

Mainly S2S open a connection to a node and try to send data for x miliseconds. If this time is not enough it moves to the next node. Other parameters are flow file count and size.

These parameters can be defined globally. In the last versions of NiFi you can define these parameters at each input port level which is very convenient. I show you how to configure this below (red instructions)

41433-screen-shot-2017-10-19-at-94308-pm.png

41434-screen-shot-2017-10-19-at-94319-pm.png

41435-screen-shot-2017-10-19-at-94328-pm.png

You can small values to test load balancing.

Please close the thread if it answers your question. If you have further questions on load balancing ask new question. This will be helpful for other users.

Thanks

avatar
Expert Contributor

@Abdelkrim Hadjidj

Perfect! Much clearer now.

Thanks.

avatar
Master Mentor
@Alvin Jin

NiFi Remote Input and Output Ports can only be added to the root canvas level. Based on the screen shot you provided above, you have created your "sftp" input port in sub process group.

Input and output ports exist to move FlowFIles between a process group and ONE LEVEL UP from that process group only. Input ports will accept FlowFiles coming from one level up and output ports allow FlowFiles to be sent one level up. You can only move FlowFiles up or down one level at a time.

At the top level of your canvas (root process group level) adding input or output ports provides the ability for that NiFi to receive (input port) FlowFiles from another NiFi instance or have another NiFi pull files from (output port) that NiFi. We refer to input and output ports added the top level as remote input or output ports. While the same input and output icon in the UI is used to add both remote and embedded input and output ports, you will notice that they are rendered differently when added to the canvas.

A Remote Input port (added to root canvas level) will appear as follows:

39877-screen-shot-2017-10-19-at-92552-am.png

While a local input port (added within a process group) will appear as follows:

39878-screen-shot-2017-10-19-at-92723-am.png

Thank you,

Matt