Support Questions

Find answers, ask questions, and share your expertise
Check out our newest addition to the community, the Cloudera Data Analytics (CDA) group hub.

Create files from GetHDFS processor flowfiles

Rising Star


i would like to create a Nifi flow that parses a number of files from HDFS with GetHDFS processor, creates a separate file for each incoming flowfile from HDFS with the same name but different extension and finally write both the original HDFS file and the newly created file to a remote location via FTP. Which Nifi processor would be suitable for this task?

The flow i imagine is as following:

GetHDFS ----> *Nifi processor* --> PutFTP


Rising Star

i figured GenerateFlowFile processor could be a solution here, so have 2 flows with same start and end point like:

GetHDFS --> PutFTP for the original HDFS files

GetHDFS --> GenerateFlowFile --> PutFTP for the newly created files based on the original HDFS ones

However the GUI wont let me connect the GetHDFS processor to GenerateFlowFile one, some input here would be really appreciated!

                                     /-------updateattrtibute(filename) ----putftp---
	Try ---ListHDFS----fetchhdfs ---------------mergecontent-----putftp --- ---

Super Guru

@Foivos A,

Generate flowfile processor won't accept any incoming connections,

Use one GetHDFS processor and connect same success relation to Two-PutFTP processor and keep the same file name.

The next success relation give to UpdateAttribute in that add another property to change file name of the flowfile.

First PutFTP is storing the HDFS files, storing as is into your remote location

Second PutFTP parsing the HDFS files and changing the name and storing to your remote location.


Add new property to update attribute as below


This update attribute changes the filename to samefilename(as we are having before) with .parsed as the extension to the filename by using this method you can easily find which file has been parsed and which is not parsed.

UpdataAttribute Configs:-


  1. Just Remainder GetHDFS processor by default deletes the HDFS files once it get those files, if you dont want to delete the files then change the property Keep source files to True(by default this property is set to false).

it's better to use List and Fetch HDFS processors to fetch the files from hdfs as listHDFS processor stores the state in it, it will catches the changes in the directory by it self.

if you are thinking to use ListHDFS and FetchHDFS processors then use the below link i have explained how to configure them



Hope this will helps ...!!!

Super Guru

@Foivos A,

in this screenshot i have used GetHDFS processor but you can replace this with

ListHDFS--> FetchHDFS  -->  PutFTP(files without parsing)-->  UpdateAttribute(files that got parsed) -->  PutFtp


  1. You have got all the listed files in the HDFS directory then use PutFTP processor and keep the Success relation of first PutFTP processor to next UpdateAttribute processor
  2. Here we i have given failure and rejected to loop it back to the same PutFTP processor, so the files that got stored into remote location only we are processing from First PutFTP processor.
  3. All the files that got rejected or failed to store into remote location are routing back to retry one more time with the same processor.

Only the files that got successfully stored into remote location will be routed to Update Attribute processor, you can parse and change the names of the files before storing to RemoteLocation(in second PutFTP) processor.

In this method we only process the files that got successfully stored into Remote Location by using First PutFTP processor(as we are using only success from FirstPutFTP to updateattribute).For all the other relations like rejected or failed you can auto terminate or you can retry them to store.


we have looped failure and retry to same processor if some files are in this relations they will retry to same processor which will keep more load on the cluster.

Rising Star

@Shu Your answer is really awesome and exactly what i would normally need, however there is one problem.

Due to connectivity and rerouting issues of the FTP server i am trying to transfer the files to, and because the PutFTP processor cannot be configured to handle these issues, i decided to use a command line ftp utility (ncftp in this case) that makes the FTP connection and transfer the files.

At first i thought to create a ListHDFS --> FetchHDFS --> ExecuteProcess flow, but the ExecuteProcess processor does not accept incoming connections. Note here that the ExecuteProcess processor by its own works like a charm if i configure it to execute either a bash script file with the ncftp commands in it, or just execute the ncftp utility with the correct arguments without the bash script.

So i replaced it with ExecuteStreamCommand processor that accepts incoming connections:

The configuration of the processor:

The correct command that works from the command line and the ExecuteProcess processor would be:

ncftpput -f /path/to/login.txt /path/to/ftp/remote /path/to/localdir/filename

But unfortunately the flow does not work and i get no error messages whatsoever to debug and find out what is possibly going wrong.

I ve also tried to set IgnoreSTDIN: true and specify the local path to the filenames i want to transfer explicitly, but this does not work either.

What i would want to do is list the HDFS contents, fetch the filenames and pass these filenames as arguments to the ExecuteStreamCommand processor as the last argument. In FetchHDFS processor there is the ${path}/${filename} HDFS filename attribute, and in the ExecuteStreamCommand processor ive set ${filename} as the last argument, but apparently the files are not stored somewhere locally in the Nifi server and they are lost somewhere in the way of the flow after the FetchHDFS processor. My question is how would you approach this issue? Any other ideas on how we could achieve the same result?

Super Guru

Hi @Foivos A,

i tried a simple flow passing attributes to Execute stream command, sample shell script to list files in directory

bash# cat
for filename in `hadoop fs -ls ${1}${2} | awk '{print $NF}' | grep .txt$ #| tr '\n' ' '`
do echo $filename; done

Run script:-

bash#./ /user/yashu/ folder2

This script takes 2 arguments and passes those to ${1},${2} above and listed out files in folder2 directory.

When we are running this script in NiFi ExecuteStreamCommand processor configure that processor as

In my flow every flowfile having dir_name,folder_name as attributes as values like

dir_name having value as


folder_name having value as


Attributes for ff:-


processor configs:-


Make sure your NiFi run script is calling the original shell script( with 2 arguments as i mentioned below.

bash# cat
/tmp/ $1$2

in my case i'm calling from nifi which calls script with two arguments as i mentioned above.

Then only the ExecuteStreamCommand processor takes 2 arguments as you have mentioned in Command Arguments property in our case ${dir_name};${folder_name} and passes those arguments to script.

How NiFi Executes Script:-

When NiFi executes script it calls script which accepts 2 arguments as we are passing those arguments from NiFi ${dir_name}will be considered as $1;${folder_name} considered as $2 and lists all the files


Rising Star

Hi @Shu,

Thanks for the response, what do you mean with the Flowfiles attributes part, how do you pass the values to the arguments?

What i want to do is run a ncftpput command where the last argument in {} is the filenames from the FetchHDFS processor that are passed automatically to the ExecuteStreamCommand processor. The other arguments are fixed and we dont have to automate them, only the filename(s) to be transffered to the FTP server.

Command: ncftpput -f /path/to/login.txt /path/to/ftp/remote {HDFS_files}

So if the ListHDFS-->FetchHDFS flow outputs lets say for example 2 files that in the specified HDFS directory /HDFS/path/file1 & /HDFS/path/file2, how can we pass these as arguments to the ncftpput command in the ExecuteStreamCommand processor? The goal is to transfer the files in the HDFS dir to the FTP server with the ExecuteStreamCommand processor.

Rising Star

Hi @Shu thanks for all the help, i am not sure i follow your logic here though:

The flow starts with the ListHDFS processor where the directory is specified in HDFS: e.g. /user/foivos

The FetchHDFS processor follows with the HDFS filename specification. Lets say we want to take only csv files so the HDFS filename property is ${path}/${filename:endsWith('csv')}

The final processor is the ExecuteStreamCommand where:

  • Command path: /path/to/
  • Command arguments: ${filename}, but where does the processor takes this value from?? Is it the ${filename} property parsed from the FetchHDFS processor where also the filtering is being done for csv files?


Super Guru

@Foivos A,

Keep your ftp command in shell script and keep this shell script in local because you want to list HDFS files


ncftpput -f /path/to/login.txt /path/to/ftp/remote${1}

Then call the above shell script1 from shell script2 . include $1 in script 2 because script 1 accepts one argument then only your script2 gets the arguments from nifi and calls script 1 with those arguments.


<>/ $1

then call from Executestreamcommand processor and specify Command Arguments property as your filename.



I would love to make this flow complicated.

> What if one putftp is success and another one failed. 🙂 do u want one to be success and other failed . or do you want to handle it like a retry

Rising Star

Interesting, definitely the latter i would like to handle it as a retry. The files should be transfered only in pairs, how would you approach it?


One more point use listHDFS and putHDFS than getHDFS . If you are using TDE then there is a bug in getHDFS which can happen in rare scenarios.

Rising Star

Hi @Shu i ve posted a new question here, hope what i am trying to do in this use case is clear!

Take a Tour of the Community
Don't have an account?
Your experience may be limited. Sign in to explore more.