Created 05-20-2025 08:10 AM
My NiFi pipeline fetches and processes files from an SFTP server.
When the number of files to fetch does not exceed 500, everything works well.
However, sometimes the number of files the pipeline has to list at once exceeds 2,000,000, which critically impacts performance.
I’m trying to find a way to throttle the file stream so that no more than 500 files are processed at any given time.
First, I looked into the GetSFTP processor. It has a convenient property called "Remote Poll Batch Size".
However, this deprecated processor has several disadvantages, the most significant being that it does not maintain state.
Because of this, when I set "Delete original" to False, it fetches files infinitely.
Setting "Delete original" to True is not desirable because my pipeline’s guaranteed delivery requirements mandate that files should only be deleted after successful processing (not at the start of the pipeline).
Next, I explored back pressure settings in the queue between ListSFTP and FetchSFTP.
For example, I set the back pressure threshold to 500 in the queue between these processors.
However, as I understand it:
When ListSFTP reads 2 million files, it pushes all of them into the queue.
ListSFTP can only add next files to the queue once FetchSFTP has processed enough files to bring the count below the back pressure limit (e.g., after 1,900,501 files are consumed).
This behavior is not what I expected.
I wanted one of the following:
ListSFTP lists no more than 500 files at a time, or
The queue between ListSFTP and FetchSFTP holds no more than 500 files, or
FetchSFTP fetches no more than 500 files at a time.
Is there a way to solve this problem without creating a custom processor or using Groovy scripts?
Created 05-20-2025 10:07 AM
@AndreyDE
NiFi Connection Backpressure can not trigger changes in configuration of upstream processor. Often the client library dictates what happens when the client is executed and NiFi may have no control over number returned. The NiFi scheduler that is responsible for giving a processor component a thread to execute looks at the downstream connection(s) coming off a processor and if any are applying Backpressure, it will not give that upstream processor a thread. So Backpressure thresholds are soft limits. So when ListSFTP gets a thread it executes the SFTP client and it returns all files based on the filtering configurations.
Number 1 and number 2 are not possible with ListSFTP due to limitations I described above.
For number 3 you have a couple options:
A) You could place a ControlRate processor between ListSFTP and FetchSFTP to control the rate at which FlowFiles are moved between those processors. You can then throttle the rate at which FlowFiles are moved from the connection feeding from ListSFTP to your downstream dataflow allowing time for those to process before next batch is passed downstream.
B) Have your ListSFTP connect to a child process group which you configure "FlowFile Concurrency" settings set to "Single FlowFile Per Node". You can then place yoru downstream processing dataflow in this child process group which would not allow another FlowFile to enter the process group until FlowFile in that process group is either auto-terminated or exited the process group. The "Single FlowFile Per Node" concurrency setting means you would not have ability to configured multiple concurrent FlowFile processing which makes this a less desirable option.
C) Combination of both A and B ListSFTP feeds a ControlRate processor (configure to allows 500 FlowFiles per minute) that lets batches of 500 FlowFiles to move from ListSFTP connection queue to connection queue connecting to a Child Process group. Configure the backpressure threshold on the connection feeding the child PG to 500 also so Backpressure gets applied as soon as controlRate allows a batch through. This backpressure will prevent ControlRate from getting scheduled again until this queued batch gets consumed by the child process group. On this child process group you also configured "FlowFile Concurrency" except configured with "Single Batch Per Node" which will allow this Process group to consume all FlowFiles from the inbound connection at once. It will then not consume again from the inbound connection until All FlowFiles are empty from the child process group. This design method control size of batches being processed at one time in the child process group while still allowing concurrent execute of multiple FlowFiles by processor components within the child process group.
Option C dataflow would look something like this:
Here you can see no more then 500 being processed in the child Process Group at one time. Back pressure of 500 on the connection feeding the Child Process group is set at 500 preventing ControlRate from adding another 500 to that connection until it is emptied when the Child process group accepts the next batch only happens when FlowFile count inside child process groups hits 0. Inside the process group is where you build yoru dataflow to fetchSFTP and process however you need the batches of FlowFiles.
Please help our community grow. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped.
Thank you,
Matt
Created 05-20-2025 10:07 AM
@AndreyDE
NiFi Connection Backpressure can not trigger changes in configuration of upstream processor. Often the client library dictates what happens when the client is executed and NiFi may have no control over number returned. The NiFi scheduler that is responsible for giving a processor component a thread to execute looks at the downstream connection(s) coming off a processor and if any are applying Backpressure, it will not give that upstream processor a thread. So Backpressure thresholds are soft limits. So when ListSFTP gets a thread it executes the SFTP client and it returns all files based on the filtering configurations.
Number 1 and number 2 are not possible with ListSFTP due to limitations I described above.
For number 3 you have a couple options:
A) You could place a ControlRate processor between ListSFTP and FetchSFTP to control the rate at which FlowFiles are moved between those processors. You can then throttle the rate at which FlowFiles are moved from the connection feeding from ListSFTP to your downstream dataflow allowing time for those to process before next batch is passed downstream.
B) Have your ListSFTP connect to a child process group which you configure "FlowFile Concurrency" settings set to "Single FlowFile Per Node". You can then place yoru downstream processing dataflow in this child process group which would not allow another FlowFile to enter the process group until FlowFile in that process group is either auto-terminated or exited the process group. The "Single FlowFile Per Node" concurrency setting means you would not have ability to configured multiple concurrent FlowFile processing which makes this a less desirable option.
C) Combination of both A and B ListSFTP feeds a ControlRate processor (configure to allows 500 FlowFiles per minute) that lets batches of 500 FlowFiles to move from ListSFTP connection queue to connection queue connecting to a Child Process group. Configure the backpressure threshold on the connection feeding the child PG to 500 also so Backpressure gets applied as soon as controlRate allows a batch through. This backpressure will prevent ControlRate from getting scheduled again until this queued batch gets consumed by the child process group. On this child process group you also configured "FlowFile Concurrency" except configured with "Single Batch Per Node" which will allow this Process group to consume all FlowFiles from the inbound connection at once. It will then not consume again from the inbound connection until All FlowFiles are empty from the child process group. This design method control size of batches being processed at one time in the child process group while still allowing concurrent execute of multiple FlowFiles by processor components within the child process group.
Option C dataflow would look something like this:
Here you can see no more then 500 being processed in the child Process Group at one time. Back pressure of 500 on the connection feeding the Child Process group is set at 500 preventing ControlRate from adding another 500 to that connection until it is emptied when the Child process group accepts the next batch only happens when FlowFile count inside child process groups hits 0. Inside the process group is where you build yoru dataflow to fetchSFTP and process however you need the batches of FlowFiles.
Please help our community grow. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped.
Thank you,
Matt
Created 05-20-2025 10:54 AM
Thanks a lot! It's a solution. For my case ControlRate is enough, but I take into account other options you suggested
Created 05-20-2025 12:50 PM
Glad I could help. Using just controlRate by itself has its limitation as well because it does not optimize for performance/throughput. You are setting some rate at which batches of FlowFiles will be passed downstream. Doing just this means multiple things can happen:
1. Rate is to short resulting in additional batches being passed downstream before previous batch completed processing. This could potentially lead to large backlogs in flows affecting downstream processing just as you were experiencing previously.
2. Rate is set to long resulting in downstream processing of a batch completing well before ControlRate releases next batch. Results in slower overall throughput.
3. If some exception occurs in downstream processing, nothing would prevent additional batches from being released into that downstream processing creating a huge backlog.
The above are handled by the slightly more complex option C.
Please help our community grow. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped.
Thank you,
Matt