Support Questions

Find answers, ask questions, and share your expertise

How to copy/merge multiple files from HDFS to an SFTP server using Apache NiFi

avatar
Rising Star

We have the below requirements in the Nifi flow

a) Fetch a file from HDFS and transfer it to a Linux Server. I have achieved this using FetchHDFS and PutSFTP NiFi Processors, and it is working fine as expected for a single file.

Now, we have a requirement that this HDFS directory may contain multiple files, and all of them need to be transferred to the SFTP server. I tried passing the HDFS directory path in the "HDFS File Name" Property of the FetchHDFS processor; however, it seems to accept only file names. Could you please suggest an alternative processor/method to achieve this?

I tried to use ListHDFS -> FetchHDFS -> PutSFTP 

Actually the HDFS files are output of sqoop jobs. I tried connecting RouterAtribute to ListHDFS, but that connection is not allowed.

b) The second requirement is to combine all the files in a specific HDFS directory. Can we achieve this using the MergeContent processor?

Could you please provide some inputs?

1 ACCEPTED SOLUTION

avatar
Master Mentor

@s198 

The List<abc> type processor are source based processors that do not accept inbound connections since they are designed to create FlowFiles and designed to modify existing FlowFiles.

I am not clear on what "So we used Sqoop completion" does to create a FlowFile in your NiFi dataflow which is then passed to RouteOnAttribute (assuming this is processor you are referring to by "Router Attribute") via a connection.  

What Attributes exist on the FlowFile being processed by the RouteOnAttribute  processor.  Any FlowFile attributes on this FlowFile about the specific file needing to fetched by the FetchHDFS processor (like filename and path)?

-----
If sqoop job output produced 1 FlowFile for each HDFS file to be fetched and each of those FlowFiles has attributes for path and filename of the HDFS file content to be fetched, you could do following:

Set the default NiFi expression language statement  "${path}/${filename}" in the "HDFS File Name" Property of the FetchHDFS processor.  Those two FlowFile attributes are expected to be in the format:
       

Attribute Name: Attribute value:
filenameThe name of the file that will be read from HDFS.
pathThe path is set to the absolute path of the file's directory on HDFS. For example,  "/tmp/abc/1/2/3".

 Attribute names are case sensitive.
-----

If the sqoop job simply outputs 1 FlowFile from which you expect to fetch a lot of HDFS files, that is not how FetchHDFS functions.  FetchHDFS expects one FlowFile for each HDFS file content being fetched.  FetchHDFS does create new FlowFiles, it only adds content to an existing FlowFile.

If this matches your scenario, you may be able to use the GetHDFSFileInfo processor that does accept and inbound connection.  It can be configured with just a path.  If you set "Group Results = None" and "Destination = Attributes", you could send the produced FlowFiles to FetchHDFS to get the content for each FlowFile output. You would still need your RouteOnAttribute processor to make sure only FlowFiles where "${hdfs.type} = file" were routed to FetchHDFS and others types are discarded.  You would probably also want an UpdateAttribute processor so you could set the filename of the FlowFile to the hdfs.objectName (done by adding dynamic property filename = ${hdfs.objectName}). Then feed those FlowFiles to your FetchHDFS processor configured to use the ${hdfs.path}${hdfs.objectName} NiFi Expression statement in the "HDFS File Name" Property.

------

If you found any of the suggestions/solutions provided helped you with your issue, please take a moment to login and click "Accept as Solution" on one or more of them that helped.

Thank you,
Matt

View solution in original post

7 REPLIES 7

avatar
Community Manager

@s198 Welcome to our community! To help you get the best possible answer, I have tagged our NiFi/HHFS experts @ckumar @Rajat_710  who may be able to assist you further.

Please feel free to provide any additional information or details about your query, and we hope that you will find a satisfactory solution to your question.



Regards,

Vidya Sargur,
Community Manager


Was your question answered? Make sure to mark the answer as the accepted solution.
If you find a reply useful, say thanks by clicking on the thumbs up button.
Learn more about the Cloudera Community:

avatar
Master Mentor

@s198 

The FetchHDFS is by default designed to be used in conjunction with the ListHDFS processor.  The ListHDFS processor is designed to connect to HDFS and generate a NiFi FlowFile for each file listed from HDFS without getting the content of that HDFS file.  The produced 0 byte FlowFiles contain FlowFile attributes that are then used by the FetchHDFS processor to obtain the actual content and insert it into the FlowFile's  content.

NiFi has numerous list/fetch sets of processors.  They were designed for sources that are not NiFi cluster friendly (meaning  that the client does not support a distributed fetch capability that would not result in data duplication).  So in a NiFi cluster the List<abc> processor would get configured to run on the NiFi cluster primary node only so that only one node in the NiFI cluster would get the metadata about all the source files to be ingested by the NiFi clusyter.  The List<abc> processor would then be connected via a Nifi connection to the Fetch<abc> processor.  The connection between these two processor would be configured to load balance the 0 byte FlowFiles across all nodes in the NiFi cluster.  Then the Fetch<abc> processor could run on all nodes.  Since each node in the cluster has a subset of the listed files, there is no duplication and the load/work is now distributed across the NiFi cluster.

If you are not using a NiFi cluster and only a standalone single NiFi instance, you could use the GetHDFS processor instead.  But if you plan to ever expand to a NIFi cluster it is best to build your dataflows now with that in mind to avoid extra work later.

If you found any of the suggestions/solutions provided helped you with your issue, please take a moment to login and click "Accept as Solution" on one or more of them that helped.

Thank you,
Matt

 

avatar
Rising Star

Hi @MattWho 

Thank you for your detailed explanation, but my issue is still not resolved.

Yes, I am using Nifi Clustor only. 

The multiple HDFS files are generated by a sqoop job. ( Earlier we were getting only one file as the output of a sqoop job. So we used  Sqoop completion --> Router Attribute --> FetchHDFS --> PutSFTP to load the file to the SFTP server. Please note that we have lot of processors before sqoop , so we cannot change any existing functionalities).

We are enhancing the existing data flow to process multiple files instead of single file. From the router attribute, I am not able to connect to ListHDFS processor. I think by design we are not able to connect.. Could you please suggest a way to connect to the ListHDFS processor from RouterAttribute. So that I can fetch multiple files and load to my SFTP server.

I have one more requirement to combine files and generate a single file in the HDFS path . Could you please provide some inputs on that as well.

avatar
Master Mentor

@s198 

The List<abc> type processor are source based processors that do not accept inbound connections since they are designed to create FlowFiles and designed to modify existing FlowFiles.

I am not clear on what "So we used Sqoop completion" does to create a FlowFile in your NiFi dataflow which is then passed to RouteOnAttribute (assuming this is processor you are referring to by "Router Attribute") via a connection.  

What Attributes exist on the FlowFile being processed by the RouteOnAttribute  processor.  Any FlowFile attributes on this FlowFile about the specific file needing to fetched by the FetchHDFS processor (like filename and path)?

-----
If sqoop job output produced 1 FlowFile for each HDFS file to be fetched and each of those FlowFiles has attributes for path and filename of the HDFS file content to be fetched, you could do following:

Set the default NiFi expression language statement  "${path}/${filename}" in the "HDFS File Name" Property of the FetchHDFS processor.  Those two FlowFile attributes are expected to be in the format:
       

Attribute Name: Attribute value:
filenameThe name of the file that will be read from HDFS.
pathThe path is set to the absolute path of the file's directory on HDFS. For example,  "/tmp/abc/1/2/3".

 Attribute names are case sensitive.
-----

If the sqoop job simply outputs 1 FlowFile from which you expect to fetch a lot of HDFS files, that is not how FetchHDFS functions.  FetchHDFS expects one FlowFile for each HDFS file content being fetched.  FetchHDFS does create new FlowFiles, it only adds content to an existing FlowFile.

If this matches your scenario, you may be able to use the GetHDFSFileInfo processor that does accept and inbound connection.  It can be configured with just a path.  If you set "Group Results = None" and "Destination = Attributes", you could send the produced FlowFiles to FetchHDFS to get the content for each FlowFile output. You would still need your RouteOnAttribute processor to make sure only FlowFiles where "${hdfs.type} = file" were routed to FetchHDFS and others types are discarded.  You would probably also want an UpdateAttribute processor so you could set the filename of the FlowFile to the hdfs.objectName (done by adding dynamic property filename = ${hdfs.objectName}). Then feed those FlowFiles to your FetchHDFS processor configured to use the ${hdfs.path}${hdfs.objectName} NiFi Expression statement in the "HDFS File Name" Property.

------

If you found any of the suggestions/solutions provided helped you with your issue, please take a moment to login and click "Accept as Solution" on one or more of them that helped.

Thank you,
Matt

avatar
Rising Star

Hi @MattWho 

Thanks a lot for the suggestions. The below set of processors helped to copy multiple HDFS files to SFTP server.
RouteOnAttribute --> GetHDFSFileInfo --> RouteOnAttribute --> UpdateAttribute --> FetchHDFS --> PutSFTP

But I still have one issue here. Based on the number of HDFS files (or the number of flow files), all the downstream processors of PutSFTP are executing the same number of times. Is there a way to restrict that? Essentially, I would like to execute all the downstream processors of PutSFTP only once, similar to when we process single HDFS file.

It would be great if you can provide some inputs on this.

avatar
Master Mentor

@s198 
Great to hear suggestions i provided solutions your question in this community question.
We encourage our community members to start new threads for unrelated questions to avoid confusion on what solved the issue in a question remains clear to other community users that may come across this thread.

That being said, my understanding of this new query is how you take a dataflow that starts from a single FlowFile produced by your squoop job that then becomes many FlowFiles, but requires only a single FlowFile post PutSFTP for downstream processing of job completion.   That could be solved using the Wait and Notify processors which can be complicated to setup or using the "FlowFile Concurrency" capability on a Process Group.
I shared a similar solution in a few other community post on how this works:

https://community.cloudera.com/t5/Support-Questions/How-to-detect-all-branches-in-a-NiFi-flow-have-f...

https://community.cloudera.com/t5/Support-Questions/NiFi-Trigger-a-Processor-once-after-the-Queue-ge...

Please help the community grow and assist other in finding solutions that help or solve  issues by taking a moment to login and click "Accept as Solution" below any response(s) that helped you.

Thank you,
Matt

avatar
Rising Star

Thank you @MattWho . You are awesome!