Created 06-08-2017 08:48 AM
I have a NiFi cluster (1.1.2) and I ran into a problem with a python processor.
I am building a pipeline that converts .avro files to another (scientific) data format. I need to convert these .avro files one by one with a python script and save the output of my python script in some directory.
My Questions:
Thanks in advance!
Created 06-08-2017 01:59 PM
1) You can execute a Python script (assuming you have Python installed locally) with ExecuteProcess or ExecuteStreamCommand. The former does not accept incoming connections, so your script would have to locate and use the Avro files on its own. The latter does accept incoming connections, so you can use ListFile -> FetchFile to get the Avro files into NiFi, then route those to ExecuteStreamCommand, where the contents will be available via STDIN to your script.
2) If your script takes paths to filenames, you can forgo the FetchFile processor and just send the results of ListFile to ExecuteStreamCommand. Or you could update your script to take STDIN as described above.
3) ExecuteStreamCommand writes an "execution.status" attribute containing the exit code for your script, so you can return > 0 for error condition(s) and use RouteOnAttribute to send the flow files down various paths for error-handling and what-not.
4) Alternatively, rather than going back out to the file system, if you can (in your script) write the contents of the file to STDOUT instead of a file, then you can route ExecuteStreamCommand directly to PutHDFS. This keeps the file contents "under management" by NiFi, which is great for provenance/lineage, since you will be able to use NiFi to see what the contents look like at each step of the flow.
If (as you described) you want to write to an output file, then after the script is finished writing the file, you can write out the path to the file to STDOUT and choose an Output Destination Attribute of something like "absolute.path.to.file" in ExecuteStreamCommand. Then you can route to FetchFile and set the File To Fetch to "${absolute.path.to.file}", then a PutHDFS processor to land the file in Hadoop.
Just for completeness (this won't apply to you as you're using numpy which is a CPython module): if you are using "pure" Python modules (meaning their code and dependencies are all Python and do not use natively compiled modules -- no CPython or .so/.dll, etc. -- you can use ExecuteScript or InvokeScriptedProcessor. This uses a Jython (not Python) engine to execute your script. These processors give you quite a bit more flexibility (albeit some more boilerplate code) as you can work with flow file attributes, content, controller services, and other NiFi features while inside your Python code. However the tradeoff (as mentioned) is needing pure Python modules/scripts.
Created 06-08-2017 01:59 PM
1) You can execute a Python script (assuming you have Python installed locally) with ExecuteProcess or ExecuteStreamCommand. The former does not accept incoming connections, so your script would have to locate and use the Avro files on its own. The latter does accept incoming connections, so you can use ListFile -> FetchFile to get the Avro files into NiFi, then route those to ExecuteStreamCommand, where the contents will be available via STDIN to your script.
2) If your script takes paths to filenames, you can forgo the FetchFile processor and just send the results of ListFile to ExecuteStreamCommand. Or you could update your script to take STDIN as described above.
3) ExecuteStreamCommand writes an "execution.status" attribute containing the exit code for your script, so you can return > 0 for error condition(s) and use RouteOnAttribute to send the flow files down various paths for error-handling and what-not.
4) Alternatively, rather than going back out to the file system, if you can (in your script) write the contents of the file to STDOUT instead of a file, then you can route ExecuteStreamCommand directly to PutHDFS. This keeps the file contents "under management" by NiFi, which is great for provenance/lineage, since you will be able to use NiFi to see what the contents look like at each step of the flow.
If (as you described) you want to write to an output file, then after the script is finished writing the file, you can write out the path to the file to STDOUT and choose an Output Destination Attribute of something like "absolute.path.to.file" in ExecuteStreamCommand. Then you can route to FetchFile and set the File To Fetch to "${absolute.path.to.file}", then a PutHDFS processor to land the file in Hadoop.
Just for completeness (this won't apply to you as you're using numpy which is a CPython module): if you are using "pure" Python modules (meaning their code and dependencies are all Python and do not use natively compiled modules -- no CPython or .so/.dll, etc. -- you can use ExecuteScript or InvokeScriptedProcessor. This uses a Jython (not Python) engine to execute your script. These processors give you quite a bit more flexibility (albeit some more boilerplate code) as you can work with flow file attributes, content, controller services, and other NiFi features while inside your Python code. However the tradeoff (as mentioned) is needing pure Python modules/scripts.
Created 06-08-2017 02:56 PM
Thanks for this in-depth answer, it resolved all my questions. Loving this community so far!
For anyone running into the same problem, I just stumbled upon the following link which explains the use of ExecuteStreamCommand with examples: https://pierrevillard.com/2016/03/09/transform-data-with-apache-nifi/
Created 10-13-2017 03:19 PM
I also found this and other answers from Matt very useful. Thank you.
I have a legacy Python script collection that processes binary files along a work-flow that I would like to speed up considerably, and I am curious about the possibility of doing this on a compute cluster with NiFi. I would like to use these scripts to extract data from the binary files and then ingest the data into a suitable database such as Spark SQL or Hive for machine learning.
Right now the ExecuteScript processor looks the most suitable for using my Python scripts in, and after some time working with it I am still having various challenges in getting things to work.
Some particulars:
My Questions:
Thank you!
Created 03-24-2018 11:56 PM
Hi @Alexander Polson, did you end up finding answers to your questions?
Created 11-30-2017 02:13 AM
Hi Matt,
I am trying to convert a series of XML files to csv files and save them on hdfs. I wrote a script that does the conversion and now I would like to use ExecutingStreamCommand processor. I tried to simulate your explanation of using ListFile->ExecutingStreamCommand->FetchFile->PutHDFS. It works well however, the name of the FlowFile is not updating after using the FetchFile processor, which I know that FetchFile was never design to change the FlowFile. So, the converted files that are saved on hdfs are now .xml, instead of .csv. Now my question is how can I update the name of the FlowFile? Should I add an other ExecutingStreamCommand for renaming? Thanks
Created 11-30-2017 06:27 PM
After ExecuteStreamCommand, you'll want an UpdateAttribute processor to set "filename" to "${filename}.csv", or a slightly more complicated expression if you are trying to replace the .xml extension with .csv rather than just appending .csv