Created 04-01-2024 03:05 AM
I am fetching file(s) from an HDFS path and transferring them to an SFTP server using Nifi. The HDFS file list is created by a Sqoop job, and the HDFS directory may contain one file or more than one file.
Here is the list of processors I am using right now:
RouteOnAttribute --> GetHDFSFileInfo --> RouteOnAttribute --> UpdateAttribute --> FetchHDFS --> PutSFTP --> UpdateAttribute
My data flow starts from a single FlowFile produced by a Sqoop job, which then becomes many FlowFiles after executing the GetHDFSFileInfo Processor (based on the number of HDFS files). However, I require only a single FlowFile post PutSFTP for downstream processing of job completion.
Could you please suggest some solutions to execute the processors after PutSFTP only once? Do we need to create any separate processor group from GetHDFSFileInfo to PutSFTP?
My Dataflow looks like below
Created 04-01-2024 07:19 AM
@s198
In your use case you could probably handle this without needing to have a sub process group usng flowfile concurrency since the GetHDFSFileInfo processor sets a number of attributes in the FlowFiles it produces that you can use to correlate the various FlowFiles to one another. Since you have already written out all your individual HDFS files to your SFTP processor, you could remove all content on the FlowFiles using ModifyBytes (no sense in waiting CPU to merge content if you don't need to keep it anymore), so all you have are zero byte FlowFiles with attribute data. Then feed that stream of zero byte FlowFiles to a MergeContent processor. I would configure your Merge Content as below:
What above will do is start placing all FlowFiles that have the same value in the FlowFile's hdfs.path flowfile attribute in the same MergeContent bin. The min num entries will prevent the MergeContent processor from merging the bin until this value is reached or until the max bin age expires. The bin age starts as soon as first FlowFile is allocated to the bin. So basically since we don't know how many files might be in any given HDFS directory, we are controlling merge via bin age instead of number of FlowFiles. This builds some delay in your dataflow, but results in 1 FlowFile output for each HDFS directory listed out and fetched. You can then take that 1 zero byte Merged FlowFile and use it to complete your single FlowFile downstream processing of job completion.
While above would work in ideal conditions, you should always design with failure possibility in mind. I would still recommend placing every component from "GetHDFSFileInfo --> RouteOnAttribute --> UpdateAttribute --> FetchHDFS --> PutSFTP -->ModifyBytes --> UpdateAttribute" inside a process group configured with "FlowFile Concurrency = Single FlowFile Per Node" and "Outbound Policy = Batch Output". This would allow you to make sure that all Fetched FlowFiles are successfully processed (written to SFTP server) before any were out put form the process group to the ModifyBytes and MergeContent processors. You never know when a issue may prevent of slow writing to SFTP server. This allows you to more easily handle those failures and assure any retries or error handled before exiting that PG and completing you job. This also allows you to set a much shorter max bin age in your MergeContent processor since before any FlowFile in that batch are released they will all have been processed, so the will all reach MergeContent at same time.
Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped.
Thank you,
Matt
Created 04-01-2024 07:19 AM
@s198
In your use case you could probably handle this without needing to have a sub process group usng flowfile concurrency since the GetHDFSFileInfo processor sets a number of attributes in the FlowFiles it produces that you can use to correlate the various FlowFiles to one another. Since you have already written out all your individual HDFS files to your SFTP processor, you could remove all content on the FlowFiles using ModifyBytes (no sense in waiting CPU to merge content if you don't need to keep it anymore), so all you have are zero byte FlowFiles with attribute data. Then feed that stream of zero byte FlowFiles to a MergeContent processor. I would configure your Merge Content as below:
What above will do is start placing all FlowFiles that have the same value in the FlowFile's hdfs.path flowfile attribute in the same MergeContent bin. The min num entries will prevent the MergeContent processor from merging the bin until this value is reached or until the max bin age expires. The bin age starts as soon as first FlowFile is allocated to the bin. So basically since we don't know how many files might be in any given HDFS directory, we are controlling merge via bin age instead of number of FlowFiles. This builds some delay in your dataflow, but results in 1 FlowFile output for each HDFS directory listed out and fetched. You can then take that 1 zero byte Merged FlowFile and use it to complete your single FlowFile downstream processing of job completion.
While above would work in ideal conditions, you should always design with failure possibility in mind. I would still recommend placing every component from "GetHDFSFileInfo --> RouteOnAttribute --> UpdateAttribute --> FetchHDFS --> PutSFTP -->ModifyBytes --> UpdateAttribute" inside a process group configured with "FlowFile Concurrency = Single FlowFile Per Node" and "Outbound Policy = Batch Output". This would allow you to make sure that all Fetched FlowFiles are successfully processed (written to SFTP server) before any were out put form the process group to the ModifyBytes and MergeContent processors. You never know when a issue may prevent of slow writing to SFTP server. This allows you to more easily handle those failures and assure any retries or error handled before exiting that PG and completing you job. This also allows you to set a much shorter max bin age in your MergeContent processor since before any FlowFile in that batch are released they will all have been processed, so the will all reach MergeContent at same time.
Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped.
Thank you,
Matt
Created 04-02-2024 09:20 PM
Thank you @MattWho
I noticed that the number of files in the HDFS directory can be retrieved using the "hdfs.count.files" property. Can we utilize this property to initiate the merging process instead of "bin age"? If yes, could you please suggest what changes we need to make in the MergeContent processor?
Created 04-03-2024 11:24 AM
@s198
The hdfs.count.files flowfile attribute is added by the GetHDFSFileInfo processor and is added as a FlowFile attribute on to the FlowFile. In order for a processor to utilize a FlowFile Attribute, the processor property must support NiFi Expression Language (NEL).
The "Max num entries" property on MergeContent processor does not support expression language.
It would be difficult to support NiFi Expression Language in such a processor by nature of its design.
But you could set the "correlation Attribute name" property to an attribute that all split FlowFiles will have same value set like "hdfs.path". That would put all FlowFiles with sam value in this FlowFile attribute in the same bin. This would allow you to process multiple different HDFS directory file merges concurrently.
Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped.
Thank you,
Matt
Created 04-05-2024 05:33 AM
Thank you @MattWho for your timely support and quick solutions. Kudos to you!
Created 04-05-2024 11:45 AM
Hi @MattWho
As you suggested I tried with a child processor group as below with "FlowFile Concurrency = Single FlowFile Per Node" and "Outbound Policy = Batch Output" to ensure that all fetched FlowFiles are successfully processed and start the MergeContent Processor.
Input Port --> GetHDFSFileInfo --> RouteOnAttribute --> UpdateAttribute --> FetchHDFS --> PutSFTP --> ModifyBytes --> Output Port
My GetHDFSFileInfo processor returns 20 HDFS files, and each execution successfully transfers 18 to 19 files to my SFTP server. However, during each execution, one or two file transfers fail in the PutSFTP Processor with the error message 'Failed to rename dot-file.' An error screenshot is attached below
I am facing this issue only when the child processor is configured with "Outbound Policy = Batch Output".
If we try without child processor group, then also it is working.
Am I missing some configuration settings here? Could you please help to fix the issue with the PutSFTP processor?
Created on 04-09-2024 04:31 AM - edited 04-09-2024 04:50 AM
Hi @MattWho