Support Questions

Find answers, ask questions, and share your expertise

Create files from GetHDFS processor flowfiles

avatar
Expert Contributor

Hello,

i would like to create a Nifi flow that parses a number of files from HDFS with GetHDFS processor, creates a separate file for each incoming flowfile from HDFS with the same name but different extension and finally write both the original HDFS file and the newly created file to a remote location via FTP. Which Nifi processor would be suitable for this task?

The flow i imagine is as following:

GetHDFS ----> *Nifi processor* --> PutFTP

13 REPLIES 13

avatar
Expert Contributor

i figured GenerateFlowFile processor could be a solution here, so have 2 flows with same start and end point like:

GetHDFS --> PutFTP for the original HDFS files

GetHDFS --> GenerateFlowFile --> PutFTP for the newly created files based on the original HDFS ones

However the GUI wont let me connect the GetHDFS processor to GenerateFlowFile one, some input here would be really appreciated!

avatar
Contributor
                                     /-------updateattrtibute(filename) ----putftp---
	Try ---ListHDFS----fetchhdfs ---------------mergecontent-----putftp --- ---

avatar
Master Guru

@Foivos A,

Generate flowfile processor won't accept any incoming connections,

Use one GetHDFS processor and connect same success relation to Two-PutFTP processor and keep the same file name.

The next success relation give to UpdateAttribute in that add another property to change file name of the flowfile.

First PutFTP is storing the HDFS files, storing as is into your remote location

Second PutFTP parsing the HDFS files and changing the name and storing to your remote location.

UpdateAttribute:-

Add new property to update attribute as below

${filename}.parsed

This update attribute changes the filename to samefilename(as we are having before) with .parsed as the extension to the filename by using this method you can easily find which file has been parsed and which is not parsed.

UpdataAttribute Configs:-

40561-update-attr-filename.png

  1. Just Remainder GetHDFS processor by default deletes the HDFS files once it get those files, if you dont want to delete the files then change the property Keep source files to True(by default this property is set to false).

it's better to use List and Fetch HDFS processors to fetch the files from hdfs as listHDFS processor stores the state in it, it will catches the changes in the directory by it self.

***FYI***
if you are thinking to use ListHDFS and FetchHDFS processors then use the below link i have explained how to configure them

https://community.hortonworks.com/questions/139391/hello-all-what-are-the-differences-between-servic...
****

Flow:-

40562-flow-gethdfs.png

Hope this will helps ...!!!

avatar
Master Guru

@Foivos A,

New_Flow:-
in this screenshot i have used GetHDFS processor but you can replace this with

ListHDFS--> FetchHDFS  -->  PutFTP(files without parsing)-->  UpdateAttribute(files that got parsed) -->  PutFtp

40564-new-flow.png

  1. You have got all the listed files in the HDFS directory then use PutFTP processor and keep the Success relation of first PutFTP processor to next UpdateAttribute processor
  2. Here we i have given failure and rejected to loop it back to the same PutFTP processor, so the files that got stored into remote location only we are processing from First PutFTP processor.
  3. All the files that got rejected or failed to store into remote location are routing back to retry one more time with the same processor.

Only the files that got successfully stored into remote location will be routed to Update Attribute processor, you can parse and change the names of the files before storing to RemoteLocation(in second PutFTP) processor.

In this method we only process the files that got successfully stored into Remote Location by using First PutFTP processor(as we are using only success from FirstPutFTP to updateattribute).For all the other relations like rejected or failed you can auto terminate or you can retry them to store.

Note:-

we have looped failure and retry to same processor if some files are in this relations they will retry to same processor which will keep more load on the cluster.

avatar
Expert Contributor

@Shu Your answer is really awesome and exactly what i would normally need, however there is one problem.

Due to connectivity and rerouting issues of the FTP server i am trying to transfer the files to, and because the PutFTP processor cannot be configured to handle these issues, i decided to use a command line ftp utility (ncftp in this case) that makes the FTP connection and transfer the files.

At first i thought to create a ListHDFS --> FetchHDFS --> ExecuteProcess flow, but the ExecuteProcess processor does not accept incoming connections. Note here that the ExecuteProcess processor by its own works like a charm if i configure it to execute either a bash script file with the ncftp commands in it, or just execute the ncftp utility with the correct arguments without the bash script.

So i replaced it with ExecuteStreamCommand processor that accepts incoming connections:

The configuration of the processor:

The correct command that works from the command line and the ExecuteProcess processor would be:

ncftpput -f /path/to/login.txt /path/to/ftp/remote /path/to/localdir/filename

But unfortunately the flow does not work and i get no error messages whatsoever to debug and find out what is possibly going wrong.

I ve also tried to set IgnoreSTDIN: true and specify the local path to the filenames i want to transfer explicitly, but this does not work either.

What i would want to do is list the HDFS contents, fetch the filenames and pass these filenames as arguments to the ExecuteStreamCommand processor as the last argument. In FetchHDFS processor there is the ${path}/${filename} HDFS filename attribute, and in the ExecuteStreamCommand processor ive set ${filename} as the last argument, but apparently the files are not stored somewhere locally in the Nifi server and they are lost somewhere in the way of the flow after the FetchHDFS processor. My question is how would you approach this issue? Any other ideas on how we could achieve the same result?

avatar
Master Guru

Hi @Foivos A,

i tried a simple flow passing attributes to Execute stream command, sample shell script to list files in directory

bash# cat hadoop.sh
for filename in `hadoop fs -ls ${1}${2} | awk '{print $NF}' | grep .txt$ #| tr '\n' ' '`
do echo $filename; done

Run script:-

bash#./hadoop.sh /user/yashu/ folder2
/user/yashu/folder2/part1.txt
/user/yashu/folder2/part1_sed.txt

This script takes 2 arguments and passes those to ${1},${2} above and listed out files in folder2 directory.

When we are running this script in NiFi ExecuteStreamCommand processor configure that processor as

In my flow every flowfile having dir_name,folder_name as attributes as values like

dir_name having value as

/user/yashu/ 

folder_name having value as

folder2 

Attributes for ff:-

40706-ff-attributes.png

processor configs:-

40704-executecommand.png

Make sure your NiFi run script is calling the original shell script(hadoop.sh) with 2 arguments as i mentioned below.

bash# cat hadoop_run.sh
/tmp/hadoop.sh $1$2

in my case i'm calling hadoop_run.sh from nifi which calls hadoop.sh script with two arguments as i mentioned above.

Then only the ExecuteStreamCommand processor takes 2 arguments as you have mentioned in Command Arguments property in our case ${dir_name};${folder_name} and passes those arguments to script.

How NiFi Executes Script:-

When NiFi executes hadoop_run.sh script it calls hadoop.sh script which accepts 2 arguments as we are passing those arguments from NiFi ${dir_name}will be considered as $1;${folder_name} considered as $2 and lists all the files


executecommand.pngstream-flow.png

avatar
Expert Contributor

Hi @Shu,

Thanks for the response, what do you mean with the Flowfiles attributes part, how do you pass the values to the arguments?

What i want to do is run a ncftpput command where the last argument in {} is the filenames from the FetchHDFS processor that are passed automatically to the ExecuteStreamCommand processor. The other arguments are fixed and we dont have to automate them, only the filename(s) to be transffered to the FTP server.

Command: ncftpput -f /path/to/login.txt /path/to/ftp/remote {HDFS_files}

So if the ListHDFS-->FetchHDFS flow outputs lets say for example 2 files that in the specified HDFS directory /HDFS/path/file1 & /HDFS/path/file2, how can we pass these as arguments to the ncftpput command in the ExecuteStreamCommand processor? The goal is to transfer the files in the HDFS dir to the FTP server with the ExecuteStreamCommand processor.

avatar
Expert Contributor

Hi @Shu thanks for all the help, i am not sure i follow your logic here though:

The flow starts with the ListHDFS processor where the directory is specified in HDFS: e.g. /user/foivos

The FetchHDFS processor follows with the HDFS filename specification. Lets say we want to take only csv files so the HDFS filename property is ${path}/${filename:endsWith('csv')}

The final processor is the ExecuteStreamCommand where:

  • Command path: /path/to/script.sh
  • Command arguments: ${filename}, but where does the processor takes this value from?? Is it the ${filename} property parsed from the FetchHDFS processor where also the filtering is being done for csv files?

39735-flow.png

avatar
Master Guru

@Foivos A,

Keep your ftp command in shell script and keep this shell script in local because you want to list HDFS files

Script1:-

ftp.sh

ncftpput -f /path/to/login.txt /path/to/ftp/remote${1}

Then call the above shell script1 from shell script2 . include $1 in script 2 because script 1 accepts one argument then only your script2 gets the arguments from nifi and calls script 1 with those arguments.

Script2:-

streamcommand_ftp.sh

<path-to-ftp.sh>/ftp.sh $1

then call streamcommand_ftp.sh from Executestreamcommand processor and specify Command Arguments property as your filename.

40745-steam.png