Support Questions

Find answers, ask questions, and share your expertise

How to convert/merge Many flow files to single flow file in Nifi

avatar
Rising Star

I am fetching file(s) from an HDFS path and transferring them to an SFTP server using Nifi. The HDFS file list is created by a Sqoop job, and the HDFS directory may contain one file or more than one file.

Here is the list of processors I am using right now:

RouteOnAttribute --> GetHDFSFileInfo --> RouteOnAttribute --> UpdateAttribute --> FetchHDFS --> PutSFTP --> UpdateAttribute

My data flow starts from a single FlowFile produced by a Sqoop job, which then becomes many FlowFiles after executing the GetHDFSFileInfo Processor (based on the number of HDFS files). However, I require only a single FlowFile post PutSFTP for downstream processing of job completion.

Could you please suggest some solutions to execute the processors after PutSFTP only once? Do we need to create any separate processor group from GetHDFSFileInfo to PutSFTP?
My Dataflow looks like below

s198_0-1711965850897.png

 

1 ACCEPTED SOLUTION

avatar
Master Mentor

@s198 

In your use case you could probably handle this without needing to have a sub process group usng flowfile concurrency since the GetHDFSFileInfo processor sets a number of attributes in the FlowFiles it produces that you can use to correlate the various FlowFiles to one another.  Since you have already written out all your individual HDFS files to your SFTP processor, you could remove all content on the FlowFiles using ModifyBytes (no sense in waiting CPU to merge content if you don't need to keep it anymore), so all you have are zero byte FlowFiles with attribute data.  Then feed that stream of zero byte FlowFiles to a MergeContent processor.   I would configure your Merge Content as below:

  • Assume defaults for any property not mentioned here
  • Correlation Attribute Name = hdfs.path
  • min num entries = <set to a value higher then you would expect to listed in any HDFS directory>
  • max num entries = <set to value larger the above>
  • max bin age = <set to value high enough to allow all files to reach this processor before this time expires>

What above will do is start placing all FlowFiles that have the same value in the FlowFile's hdfs.path flowfile attribute in the same MergeContent bin.  The min num entries will prevent the MergeContent processor from merging the bin until this value is reached or until the max bin age expires.  The bin age starts as soon as first FlowFile is allocated to the bin.   So basically since we don't know how many files might be in any given HDFS directory, we are controlling merge via bin age instead of number of FlowFiles.  This builds some delay in your dataflow, but results in 1 FlowFile output for each HDFS directory listed out and fetched.  You can then take that 1 zero byte Merged FlowFile and use it to complete your single FlowFile downstream processing of job completion.

While above would work in ideal conditions, you should always design with failure possibility in mind. I would still recommend placing every component from "GetHDFSFileInfo --> RouteOnAttribute --> UpdateAttribute --> FetchHDFS --> PutSFTP -->ModifyBytes --> UpdateAttribute" inside a process group configured with "FlowFile ConcurrencySingle FlowFile Per Node" and "Outbound Policy = Batch Output".  This would allow you to make sure that all Fetched FlowFiles are successfully processed (written to SFTP server) before any were out put form the process group to the ModifyBytes and MergeContent processors.  You never know when a issue may prevent of slow writing to SFTP server.  This allows you to more easily handle those failures and assure any retries or error handled before exiting that PG and completing you job.  This also allows you to set a much shorter max bin age in your MergeContent processor since before any FlowFile in that batch are released they will all have been processed, so the will all reach MergeContent at same time.

Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped.

Thank you,
Matt

 

View solution in original post

6 REPLIES 6

avatar
Master Mentor

@s198 

In your use case you could probably handle this without needing to have a sub process group usng flowfile concurrency since the GetHDFSFileInfo processor sets a number of attributes in the FlowFiles it produces that you can use to correlate the various FlowFiles to one another.  Since you have already written out all your individual HDFS files to your SFTP processor, you could remove all content on the FlowFiles using ModifyBytes (no sense in waiting CPU to merge content if you don't need to keep it anymore), so all you have are zero byte FlowFiles with attribute data.  Then feed that stream of zero byte FlowFiles to a MergeContent processor.   I would configure your Merge Content as below:

  • Assume defaults for any property not mentioned here
  • Correlation Attribute Name = hdfs.path
  • min num entries = <set to a value higher then you would expect to listed in any HDFS directory>
  • max num entries = <set to value larger the above>
  • max bin age = <set to value high enough to allow all files to reach this processor before this time expires>

What above will do is start placing all FlowFiles that have the same value in the FlowFile's hdfs.path flowfile attribute in the same MergeContent bin.  The min num entries will prevent the MergeContent processor from merging the bin until this value is reached or until the max bin age expires.  The bin age starts as soon as first FlowFile is allocated to the bin.   So basically since we don't know how many files might be in any given HDFS directory, we are controlling merge via bin age instead of number of FlowFiles.  This builds some delay in your dataflow, but results in 1 FlowFile output for each HDFS directory listed out and fetched.  You can then take that 1 zero byte Merged FlowFile and use it to complete your single FlowFile downstream processing of job completion.

While above would work in ideal conditions, you should always design with failure possibility in mind. I would still recommend placing every component from "GetHDFSFileInfo --> RouteOnAttribute --> UpdateAttribute --> FetchHDFS --> PutSFTP -->ModifyBytes --> UpdateAttribute" inside a process group configured with "FlowFile ConcurrencySingle FlowFile Per Node" and "Outbound Policy = Batch Output".  This would allow you to make sure that all Fetched FlowFiles are successfully processed (written to SFTP server) before any were out put form the process group to the ModifyBytes and MergeContent processors.  You never know when a issue may prevent of slow writing to SFTP server.  This allows you to more easily handle those failures and assure any retries or error handled before exiting that PG and completing you job.  This also allows you to set a much shorter max bin age in your MergeContent processor since before any FlowFile in that batch are released they will all have been processed, so the will all reach MergeContent at same time.

Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped.

Thank you,
Matt

 

avatar
Rising Star

Thank you @MattWho 

I noticed that the number of files in the HDFS directory can be retrieved using the "hdfs.count.files" property. Can we utilize this property to initiate the merging process instead of "bin age"? If yes, could you please suggest what changes we need to make in the MergeContent processor?

avatar
Master Mentor

@s198 
The hdfs.count.files flowfile attribute is added by the GetHDFSFileInfo processor and is added as a FlowFile attribute on to the FlowFile.  In order for a processor to utilize a FlowFile Attribute, the processor property must support NiFi Expression Language (NEL).
The "Max num entries" property on MergeContent processor does not support expression language.  

MattWho_0-1712160425546.png

It would be difficult to support NiFi Expression Language in such a processor by nature of its design.

But you could set the "correlation Attribute name" property to an attribute that all split FlowFiles will have same value set like "hdfs.path".   That would put all FlowFiles with sam value in this FlowFile attribute in the same bin.  This would allow you to process multiple different HDFS directory file merges concurrently.

Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped.

Thank you,
Matt

avatar
Rising Star

Thank you @MattWho for your timely support and quick solutions. Kudos to you!

avatar
Rising Star

Hi @MattWho 

As you suggested I tried with a child processor group as below with "FlowFile Concurrency = Single FlowFile Per Node" and "Outbound Policy = Batch Output" to ensure that all fetched FlowFiles are successfully processed and start the MergeContent Processor.
Input Port --> GetHDFSFileInfo --> RouteOnAttribute --> UpdateAttribute --> FetchHDFS --> PutSFTP --> ModifyBytes --> Output Port

My GetHDFSFileInfo processor returns 20 HDFS files, and each execution successfully transfers 18 to 19 files to my SFTP server. However, during each execution, one or two file transfers fail in the PutSFTP Processor with the error message 'Failed to rename dot-file.' An error screenshot is attached below

Capture.JPG

I am facing this issue only when the child processor is configured with "Outbound Policy = Batch Output".
If we try without child processor group, then also it is working.

Am I missing some configuration settings here? Could you please help to fix the issue with the PutSFTP processor?

avatar
Rising Star

Hi @MattWho 

 

1) Initially, I faced the "NiFi PutSFTP failed to rename dot file issue" only when the child processor was configured with "Outbound Policy = Batch Output". It worked without the child processor group.
2) I modified the PutSFTP failure retry attempt to 3, and it fixed the issue.
3) Later, I introduced a RouteOnAttribute after the FetchHDFS processor for some internal logic implementation, and the PutSFTP error started again. 
4) This time, I updated the "Run Schedule" of the PutSFTP processor from 0Sec to 3 Sec, and it again fixed the issue. 
5) I have a requirement to transfer stats of each file (with file name, row count, file size) etc. So, I introduced one more PutSFTP processor, and the issue popped up again. 
6) Finally, I made the following changes to both of my PutSFTP processors:
       a) Added PutSFTP failure retry attempt to 3.
       b) Modified the "Run Schedule" of the first PutSFTP Processor to "7 Sec".
       c) Modified the "Run Schedule" of the second PutSFTP Processor to "10 Sec".
 
Now it is working fine. Are we getting this issue because of 20 flowfiles processing at a time ?  Could you please suggest if this is the right way to fix the "NiFi PutSFTP failed to rename dot file issue"?