Created 09-27-2017 02:24 PM
Hello,
i would like to create a Nifi flow that parses a number of files from HDFS with GetHDFS processor, creates a separate file for each incoming flowfile from HDFS with the same name but different extension and finally write both the original HDFS file and the newly created file to a remote location via FTP. Which Nifi processor would be suitable for this task?
The flow i imagine is as following:
GetHDFS ----> *Nifi processor* --> PutFTP
Created 09-27-2017 02:54 PM
i figured GenerateFlowFile processor could be a solution here, so have 2 flows with same start and end point like:
GetHDFS --> PutFTP for the original HDFS files
GetHDFS --> GenerateFlowFile --> PutFTP for the newly created files based on the original HDFS ones
However the GUI wont let me connect the GetHDFS processor to GenerateFlowFile one, some input here would be really appreciated!
Created 09-27-2017 03:13 PM
/-------updateattrtibute(filename) ----putftp--- Try ---ListHDFS----fetchhdfs ---------------mergecontent-----putftp --- ---
Created on 09-27-2017 03:20 PM - edited 08-17-2019 11:47 PM
@Foivos A,
Generate flowfile processor won't accept any incoming connections,
Use one GetHDFS processor and connect same success relation to Two-PutFTP processor and keep the same file name.
The next success relation give to UpdateAttribute in that add another property to change file name of the flowfile.
First PutFTP is storing the HDFS files, storing as is into your remote location
Second PutFTP parsing the HDFS files and changing the name and storing to your remote location.
UpdateAttribute:-
Add new property to update attribute as below
${filename}.parsed
This update attribute changes the filename to samefilename(as we are having before) with .parsed as the extension to the filename by using this method you can easily find which file has been parsed and which is not parsed.
UpdataAttribute Configs:-
it's better to use List and Fetch HDFS processors to fetch the files from hdfs as listHDFS processor stores the state in it, it will catches the changes in the directory by it self.
***FYI***
if you are thinking to use ListHDFS and FetchHDFS processors then use the below link i have explained how to configure them
Flow:-
Hope this will helps ...!!!
Created on 09-27-2017 09:12 PM - edited 08-17-2019 11:47 PM
@Foivos A,
New_Flow:-
in this screenshot i have used GetHDFS processor but you can replace this with
ListHDFS--> FetchHDFS --> PutFTP(files without parsing)--> UpdateAttribute(files that got parsed) --> PutFtp
Only the files that got successfully stored into remote location will be routed to Update Attribute processor, you can parse and change the names of the files before storing to RemoteLocation(in second PutFTP) processor.
In this method we only process the files that got successfully stored into Remote Location by using First PutFTP processor(as we are using only success from FirstPutFTP to updateattribute).For all the other relations like rejected or failed you can auto terminate or you can retry them to store.
Note:-
we have looped failure and retry to same processor if some files are in this relations they will retry to same processor which will keep more load on the cluster.
Created 10-05-2017 03:06 PM
@Shu Your answer is really awesome and exactly what i would normally need, however there is one problem.
Due to connectivity and rerouting issues of the FTP server i am trying to transfer the files to, and because the PutFTP processor cannot be configured to handle these issues, i decided to use a command line ftp utility (ncftp in this case) that makes the FTP connection and transfer the files.
At first i thought to create a ListHDFS --> FetchHDFS --> ExecuteProcess flow, but the ExecuteProcess processor does not accept incoming connections. Note here that the ExecuteProcess processor by its own works like a charm if i configure it to execute either a bash script file with the ncftp commands in it, or just execute the ncftp utility with the correct arguments without the bash script.
So i replaced it with ExecuteStreamCommand processor that accepts incoming connections:
The configuration of the processor:
The correct command that works from the command line and the ExecuteProcess processor would be:
ncftpput -f /path/to/login.txt /path/to/ftp/remote /path/to/localdir/filename
But unfortunately the flow does not work and i get no error messages whatsoever to debug and find out what is possibly going wrong.
I ve also tried to set IgnoreSTDIN: true and specify the local path to the filenames i want to transfer explicitly, but this does not work either.
What i would want to do is list the HDFS contents, fetch the filenames and pass these filenames as arguments to the ExecuteStreamCommand processor as the last argument. In FetchHDFS processor there is the ${path}/${filename} HDFS filename attribute, and in the ExecuteStreamCommand processor ive set ${filename} as the last argument, but apparently the files are not stored somewhere locally in the Nifi server and they are lost somewhere in the way of the flow after the FetchHDFS processor. My question is how would you approach this issue? Any other ideas on how we could achieve the same result?
Created on 10-05-2017 11:12 PM - edited 08-17-2019 11:46 PM
Hi @Foivos A,
i tried a simple flow passing attributes to Execute stream command, sample shell script to list files in directory
bash# cat hadoop.sh for filename in `hadoop fs -ls ${1}${2} | awk '{print $NF}' | grep .txt$ #| tr '\n' ' '` do echo $filename; done
Run script:-
bash#./hadoop.sh /user/yashu/ folder2
/user/yashu/folder2/part1.txt
/user/yashu/folder2/part1_sed.txt
This script takes 2 arguments and passes those to ${1},${2} above and listed out files in folder2 directory.
When we are running this script in NiFi ExecuteStreamCommand processor configure that processor as
In my flow every flowfile having dir_name,folder_name as attributes as values like
dir_name having value as
/user/yashu/
folder_name having value as
folder2
Attributes for ff:-
processor configs:-
Make sure your NiFi run script is calling the original shell script(hadoop.sh) with 2 arguments as i mentioned below.
bash# cat hadoop_run.sh
/tmp/hadoop.sh $1$2
in my case i'm calling hadoop_run.sh from nifi which calls hadoop.sh script with two arguments as i mentioned above.
Then only the ExecuteStreamCommand processor takes 2 arguments as you have mentioned in Command Arguments property in our case ${dir_name};${folder_name} and passes those arguments to script.
How NiFi Executes Script:-
When NiFi executes hadoop_run.sh script it calls hadoop.sh script which accepts 2 arguments as we are passing those arguments from NiFi ${dir_name}will be considered as $1;${folder_name} considered as $2 and lists all the files
Created 10-06-2017 08:06 AM
Hi @Shu,
Thanks for the response, what do you mean with the Flowfiles attributes part, how do you pass the values to the arguments?
What i want to do is run a ncftpput command where the last argument in {} is the filenames from the FetchHDFS processor that are passed automatically to the ExecuteStreamCommand processor. The other arguments are fixed and we dont have to automate them, only the filename(s) to be transffered to the FTP server.
Command: ncftpput -f /path/to/login.txt /path/to/ftp/remote {HDFS_files}
So if the ListHDFS-->FetchHDFS flow outputs lets say for example 2 files that in the specified HDFS directory /HDFS/path/file1 & /HDFS/path/file2, how can we pass these as arguments to the ncftpput command in the ExecuteStreamCommand processor? The goal is to transfer the files in the HDFS dir to the FTP server with the ExecuteStreamCommand processor.
Created on 10-10-2017 03:13 PM - edited 08-17-2019 11:46 PM
Hi @Shu thanks for all the help, i am not sure i follow your logic here though:
The flow starts with the ListHDFS processor where the directory is specified in HDFS: e.g. /user/foivos
The FetchHDFS processor follows with the HDFS filename specification. Lets say we want to take only csv files so the HDFS filename property is ${path}/${filename:endsWith('csv')}
The final processor is the ExecuteStreamCommand where:
Created on 10-08-2017 06:07 PM - edited 08-17-2019 11:46 PM
Keep your ftp command in shell script and keep this shell script in local because you want to list HDFS files
Script1:-
ftp.sh
ncftpput -f /path/to/login.txt /path/to/ftp/remote${1}
Then call the above shell script1 from shell script2 . include $1 in script 2 because script 1 accepts one argument then only your script2 gets the arguments from nifi and calls script 1 with those arguments.
Script2:-
streamcommand_ftp.sh
<path-to-ftp.sh>/ftp.sh $1
then call streamcommand_ftp.sh from Executestreamcommand processor and specify Command Arguments property as your filename.