Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

How to convert/merge Many flow files to single flow file in Nifi

avatar
Rising Star

I am fetching file(s) from an HDFS path and transferring them to an SFTP server using Nifi. The HDFS file list is created by a Sqoop job, and the HDFS directory may contain one file or more than one file.

Here is the list of processors I am using right now:

RouteOnAttribute --> GetHDFSFileInfo --> RouteOnAttribute --> UpdateAttribute --> FetchHDFS --> PutSFTP --> UpdateAttribute

My data flow starts from a single FlowFile produced by a Sqoop job, which then becomes many FlowFiles after executing the GetHDFSFileInfo Processor (based on the number of HDFS files). However, I require only a single FlowFile post PutSFTP for downstream processing of job completion.

Could you please suggest some solutions to execute the processors after PutSFTP only once? Do we need to create any separate processor group from GetHDFSFileInfo to PutSFTP?
My Dataflow looks like below

s198_0-1711965850897.png

 

1 ACCEPTED SOLUTION

avatar
Super Mentor
hide-solution

This problem has been solved!

Want to get a detailed solution you have to login/registered on the community

Register/Login
6 REPLIES 6

avatar
Super Mentor
hide-solution

This problem has been solved!

Want to get a detailed solution you have to login/registered on the community

Register/Login

avatar
Rising Star

Thank you @MattWho 

I noticed that the number of files in the HDFS directory can be retrieved using the "hdfs.count.files" property. Can we utilize this property to initiate the merging process instead of "bin age"? If yes, could you please suggest what changes we need to make in the MergeContent processor?

avatar
Super Mentor

@s198 
The hdfs.count.files flowfile attribute is added by the GetHDFSFileInfo processor and is added as a FlowFile attribute on to the FlowFile.  In order for a processor to utilize a FlowFile Attribute, the processor property must support NiFi Expression Language (NEL).
The "Max num entries" property on MergeContent processor does not support expression language.  

MattWho_0-1712160425546.png

It would be difficult to support NiFi Expression Language in such a processor by nature of its design.

But you could set the "correlation Attribute name" property to an attribute that all split FlowFiles will have same value set like "hdfs.path".   That would put all FlowFiles with sam value in this FlowFile attribute in the same bin.  This would allow you to process multiple different HDFS directory file merges concurrently.

Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped.

Thank you,
Matt

avatar
Rising Star

Thank you @MattWho for your timely support and quick solutions. Kudos to you!

avatar
Rising Star

Hi @MattWho 

As you suggested I tried with a child processor group as below with "FlowFile Concurrency = Single FlowFile Per Node" and "Outbound Policy = Batch Output" to ensure that all fetched FlowFiles are successfully processed and start the MergeContent Processor.
Input Port --> GetHDFSFileInfo --> RouteOnAttribute --> UpdateAttribute --> FetchHDFS --> PutSFTP --> ModifyBytes --> Output Port

My GetHDFSFileInfo processor returns 20 HDFS files, and each execution successfully transfers 18 to 19 files to my SFTP server. However, during each execution, one or two file transfers fail in the PutSFTP Processor with the error message 'Failed to rename dot-file.' An error screenshot is attached below

Capture.JPG

I am facing this issue only when the child processor is configured with "Outbound Policy = Batch Output".
If we try without child processor group, then also it is working.

Am I missing some configuration settings here? Could you please help to fix the issue with the PutSFTP processor?

avatar
Rising Star

Hi @MattWho 

 

1) Initially, I faced the "NiFi PutSFTP failed to rename dot file issue" only when the child processor was configured with "Outbound Policy = Batch Output". It worked without the child processor group.
2) I modified the PutSFTP failure retry attempt to 3, and it fixed the issue.
3) Later, I introduced a RouteOnAttribute after the FetchHDFS processor for some internal logic implementation, and the PutSFTP error started again. 
4) This time, I updated the "Run Schedule" of the PutSFTP processor from 0Sec to 3 Sec, and it again fixed the issue. 
5) I have a requirement to transfer stats of each file (with file name, row count, file size) etc. So, I introduced one more PutSFTP processor, and the issue popped up again. 
6) Finally, I made the following changes to both of my PutSFTP processors:
       a) Added PutSFTP failure retry attempt to 3.
       b) Modified the "Run Schedule" of the first PutSFTP Processor to "7 Sec".
       c) Modified the "Run Schedule" of the second PutSFTP Processor to "10 Sec".
 
Now it is working fine. Are we getting this issue because of 20 flowfiles processing at a time ?  Could you please suggest if this is the right way to fix the "NiFi PutSFTP failed to rename dot file issue"?