Created 04-25-2024 11:01 AM
I am fetching files from a particular HDFS path and using a MergeContent processor to merge all the fetched files. Then, I transfer them to an SFTP server using a PutSFTP Processor. There are currently 20 files present in the path, with a total file size of 1.2 GB (This may vary in my production environment, ranging around ~300GB).
Initially, my MergeContent processor handled 1GB of file size (14 out of 20 files), merging and transferring to the SFTP server. Later, it picked up the remaining 0.2GB of files(the remaining 6 files) and transferred another file to my SFTP server.
I updated the queue limit size to 2GB for the MergeContent processor incoming connection, and then it merged all 20 files and copy a 1.2GB file at once.
In another flow, I have a FetchHDFS-->PutSFTP processor, which copies a single file with a size exceeding 100GB to the SFTP server. The Back Pressure Size Threshold is set to 1GB, and it's working. I am wondering why it is not working in the MergeContent Processors
Could you please advise on the appropriate configuration settings for the MergeContent processor? Every day, my total file size may vary from 10 GB to 300GB.
Created 04-26-2024 03:04 AM
Here is the configuration for my MergeContent Processor
Created 04-26-2024 06:02 AM
@s198
Back Pressure thresholds are configured on NiFi connections between processors. There are two types of back pressure thresholds
1. Object Threshold - Back pressure is applied once the Number of FlowFiles reaches or exceeds the setting (default is 10,000 FlowFiles). Applied per node and not across all nodes in a NiFi cluster.
2. Size Threshold - Back pressure is applied once the total data size of queued FlowFiles reaches or exceeds the setting (default is 1 GB). Applied per node and not across all nodes in a NiFi cluster.
When Back pressure is being applied on a connection, it prevents the immediate processor that feeds data into that connection form being scheduled to execute until the back pressure is no longer being applied.
Since back pressure is a soft limit, this explains you two different scenarios:
1. 20 FlowFiles being transferred to connection feeding your mergeContent processor. Initially that connection is empty so no back pressure is applied. The preceding processor that starts adding FlowFiles to that connection until the "Size Threshold" of 1 GB was reached and thus back pressure is then applied preventing the preceding processor from being scheduled and processing the remaining 6 files. The max bin age set on your mergeContent processor then forces the bin containing the first 14 FlowFiles to merge after 5 minutes thus removing the back pressure that allowed nect 6 files to be processed by upstream processor.
2. The connection between the FetchHDFS and PutSFTP processor has no back pressure being applied (neither object threshold or size threshold has been reached or exceeded), so the FetchHDFS is scheduled to execute. The execution resulted in a single FlowFile larger then the 1 GB size threshold, so back pressure would be applied as soon as that 100 GB file was queued. As soon as the putSFTP successfully executed and moved the FlowFile to one of it's downstream relationships, the FetchHDFS would have been allowed to get scheduled again.
There are also processor that do execute on batches of files in a single execution. The list and split based processors like listFile and splitContent are good examples. It is possible that the listFile processor performs a listing execution containing in excess of 10,000 object threshold. Since no back pressure is being applied that execution will be successful and list create all 10,000+ FlowFiles that get transferred to the downstream connection. Back pressure will then be applied until the number of FlowFiles drops back below the threshold. That means as soon as it drops to 9,999 back pressure would be lifted and the listFile processor would be allowed to execute.
In your mergeContent example you did the proper edit to object size threshold to allow more FlowFiles to queue in the upstream connection to your mergeContent. If you left the downstream connection containing the "merged" relationship with default size threshold, back pressure would have been applied as soon as the merged FlowFile was added to the connection since its merged size exceeded the 1 GB default size threshold.
PRO TIP: You mentioned that your daily merge size may vary from 10 GB to 300GB for your mergeContent. How to handle this in the most efficient way depends really on the number of FlowFiles and no so much on the size of the FlowFiles. Only thing to keep in in mind with size thresholds is the content_repository size limitations. The total disk usage by the content repository is not equal to the size of the actively queued FlowFiles on the canvas due to the fact the content is immutable once created and how NiFi stores FlowFile's content in claims. NiFi holds FlowFile attributes/metadata in NiFi's heap memory for better performance (swapping thresholds exist to help prevent Out of Memory issues but impact performance when swapping is happening). NiFi sets object threshold at 10,000 because swapping does not happen at that default size. When merging batches of FlowFiles in very large number you can get better performance from two MergeContent processors in series instead of just one. To help you understand above more, I recommend reading the following two articles:
https://community.cloudera.com/t5/Community-Articles/Dissecting-the-NiFi-quot-connection-quot-Heap-u...
https://community.cloudera.com/t5/Community-Articles/Understanding-how-NiFi-s-Content-Repository-Arc...
Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped.
Thank you,
Matt