Support Questions
Find answers, ask questions, and share your expertise
Alert: Please see the Cloudera blog for information on the Cloudera Response to CVE-2021-4428

NiFi - Load Distribution in GetFile Processor



Can someone please explain NiFi's behaviour in following scenario:

The cluster has 4 nodes. There is a GetFile processor polling every minute a shared folder containing thousands of files and running with 2 concurrent tasks. This translate to 8 running threads (as shown in following image):

  1. Since all the running instances are reading from a single shared folder, is there a possibility that multiple threads pickup the same file causing duplicate flow files? I know that by "keep source file"=false, we can avoid it. But what will happens when it's set to true?
  2. Is there a feature in NiFi framework that safeguards in such scenarios (where a processor tries to read a shared resource from multiple threads across nodes)? Or is it something developer has to handle themselves while writing a custom processor (lets say someone is writing a custom GetDropBoxFile processor). While writing a custom processor, how can we ensure we don't end up in data duplication or a race condition.
  3. Does Zk plays any role in such scenarios (maintaining global state of what needs to process, who will process, who is already processing)?




Hi @Manish Gupta,

I think you should use List/FetchFile processors. You would have one ListFile processor running on primary node only (defined in scheduling strategy), then use a remote process group to balance the flow files over your nodes and use the FetchFile processor to actually retrieve the files from the shared folder.

If you use GetFile processor, you may indeed run into troubles as you anticipated. This is why such kind of processors are decoupled in a List/Fetch pattern. At least this is the best practice and what is recommended.

Hope this helps.


Thank Pierre. So, as a best practice, if I am developing a new Get File processor (say from DropBox ), I should better create corresponding List/Fetch file processor?

Yes, unless you are able to deal with possible race condition programmatically. But it can be difficult to manage depending of what kind of processor you are developing. Since it is really easy to develop a custom processor, I'd encourage you to follow the List/Fetch pattern: it is easier to develop, and allows you to easily balance the load in a cluster by leveraging the NiFi framework.

Hi @Pierre Villard,

Would you please give more details about why should the FetchFile processor be in a "Remote process group" and not a simple "Process group" ? also can the remote process group be defined in the same cluster where the ListFile processor is defined ?

Thank you for your answer

Hi @Mohammed El Moumni

I believe there is a misunderstanding on the vocabulary here.

There is no such thing as being "in a Remote Process Group". A RPG is just an abstract object that you can put on your canvas with input relationships saying that all the flow files coming in the RPG will be load balanced and sent to the cluster indicated in the RPG and will be sent to the given remote input port (another abstract object to put on the canvas)

Again a RPG is just a symbolic link to another NiFi instance (cluster or standalone) and this remote instance can be the cluster itself. In fact, you MUST use a RPG pointing to the same cluster if you want to load balance flow files coming from a processor running on the primary node only.

In the case of List/Fetch processors, the recommended best practice is :

- ListX running on primary node only to avoid concurrent access to what is listed (file system, database, ftp, hdfs, etc)

- FetchX to actually get the data from the remote system

BUT if you link the two processors, since the ListX is running on the primary node only, the data will remain on the primary node only and the FetchX running in the other nodes of your cluster won't do anything. You have to use a RPG pointing to itself, so that flow files coming out of ListX are load balanced on all nodes and to have all the FetchX actually doing something.

Consequence is, you should have two flows:

ListX -> RPG pointing to same cluster and configured with "MyInputPort" as destination

Input Port "MyInputPort" -> FetchX

Hope this clarifies things.

This is an awesome answer. Thank you very much. Just tested it and it is working fine. Since it is the first time I use a remote process group, would you please tell me the difference between RAW and HTTP transport protocol ? (I should probably put this in another question)

Have a look here:

In particular:

By default, it is set to RAW which uses raw socket communication using a dedicated port. HTTP transport protocol is especially useful if the remote NiFi instance is in a restricted network that only allow access through HTTP(S) protocol or only accessible from a specific HTTP Proxy server. For accessing through a HTTP Proxy Server, BASIC and DIGEST authentication are supported.

So it is more a question about what are your network specifications.

Master Guru

@Manish Gupta I agree with Peirre's response above. Just to add to his response.

Nodes in a Nifi cluster are not aware of each other. They are simply running a dataflow and handling the files they have in their control. The GetFile processor when running as you described across multiple nodes against a shared resource will all try to connect based on the run schedule and obtain a listing of the files in the local directory. They will all then try to retrieve the files from that listing. So even with Keep Source File = false, you may still encounter file not found error son some of your nodes because another node consumed and deleted a file before another node did who also had the file in its initial listing. So it create a bit of a race condition here.

so the best approach is a listFile processor configured with a scheduling strategy of "On primary Node" feeding a Remote Process Group (RPG) as described by Pierre. The ListFile processor produces a single 0 byte Flowfile for each file it lists. Those 0 byte flowfiles have NiFi attributes on them that provide the details necessary for the FetchFile processor to retrieve the exact file. In addition the ListFile processor can be configured to store state across you cluster so that the same files are not listed more then once even if the processor is stopped and restarted. Storing the state across your cluster also allows you to switch at any time which of your nodes is the primary node.




Thanks Matt.

"Nodes in a Nifi cluster are not aware of each other." - is this true for concurrent tasks (running on the same node) as well? If this is the case, then increasing the concurrent task to something greater than 1 is always risky for any processor in any flow. The processor executing multiple threads will go in a race condition or generate duplicate flow files or or do the same flow file processing multiple times.

For example - if I run something as simple as ListenTCP -> ConvertJSONToAvro -> MergeContent -> PutFile on a single node cluster, and keep Concurrent Task to 1 for each of the processor except ConvertJSONToAvro, for which I keep the value to 2. Then ConvertJSONToAvro can process a single incoming JSON twice and generate 2 records.

Or, I am missing something?