Support Questions

Find answers, ask questions, and share your expertise

Execute python script with NiFi?

avatar
Explorer

I have a NiFi cluster (1.1.2) and I ran into a problem with a python processor.

I am building a pipeline that converts .avro files to another (scientific) data format. I need to convert these .avro files one by one with a python script and save the output of my python script in some directory.

  1. I want my avro files to be routed one by one to a python processor in the cluster
  2. Now I need to execute a python script on this file. The output of my python script is another file.
  3. I want to store the newly converted files in HDFS

My Questions:

  1. How can I execute a python script?
  2. The python script currently expects a file that it can read - what do I need to change so that it can access the binary input data from the flow file? The python script has dependencies to numpy, avro and some other libraries to be able to convert the files from avro to my output format - just in case that matters.
  3. How does my python script tell NiFi if the conversion succeeded or not?
  4. The python script creates the output file in a local folder on the machine that runs the script. After the python script finished, I need to read out the file in its entirety and save it to HDFS. How can I do this? I am thinking of returning the newly created file path as the output of the python processor and then use the GetFile processor to read that file and save it into HDFS?

Thanks in advance!

1 ACCEPTED SOLUTION

avatar
Master Guru

1) You can execute a Python script (assuming you have Python installed locally) with ExecuteProcess or ExecuteStreamCommand. The former does not accept incoming connections, so your script would have to locate and use the Avro files on its own. The latter does accept incoming connections, so you can use ListFile -> FetchFile to get the Avro files into NiFi, then route those to ExecuteStreamCommand, where the contents will be available via STDIN to your script.

2) If your script takes paths to filenames, you can forgo the FetchFile processor and just send the results of ListFile to ExecuteStreamCommand. Or you could update your script to take STDIN as described above.

3) ExecuteStreamCommand writes an "execution.status" attribute containing the exit code for your script, so you can return > 0 for error condition(s) and use RouteOnAttribute to send the flow files down various paths for error-handling and what-not.

4) Alternatively, rather than going back out to the file system, if you can (in your script) write the contents of the file to STDOUT instead of a file, then you can route ExecuteStreamCommand directly to PutHDFS. This keeps the file contents "under management" by NiFi, which is great for provenance/lineage, since you will be able to use NiFi to see what the contents look like at each step of the flow.

If (as you described) you want to write to an output file, then after the script is finished writing the file, you can write out the path to the file to STDOUT and choose an Output Destination Attribute of something like "absolute.path.to.file" in ExecuteStreamCommand. Then you can route to FetchFile and set the File To Fetch to "${absolute.path.to.file}", then a PutHDFS processor to land the file in Hadoop.

Just for completeness (this won't apply to you as you're using numpy which is a CPython module): if you are using "pure" Python modules (meaning their code and dependencies are all Python and do not use natively compiled modules -- no CPython or .so/.dll, etc. -- you can use ExecuteScript or InvokeScriptedProcessor. This uses a Jython (not Python) engine to execute your script. These processors give you quite a bit more flexibility (albeit some more boilerplate code) as you can work with flow file attributes, content, controller services, and other NiFi features while inside your Python code. However the tradeoff (as mentioned) is needing pure Python modules/scripts.

View solution in original post

6 REPLIES 6

avatar
Master Guru

1) You can execute a Python script (assuming you have Python installed locally) with ExecuteProcess or ExecuteStreamCommand. The former does not accept incoming connections, so your script would have to locate and use the Avro files on its own. The latter does accept incoming connections, so you can use ListFile -> FetchFile to get the Avro files into NiFi, then route those to ExecuteStreamCommand, where the contents will be available via STDIN to your script.

2) If your script takes paths to filenames, you can forgo the FetchFile processor and just send the results of ListFile to ExecuteStreamCommand. Or you could update your script to take STDIN as described above.

3) ExecuteStreamCommand writes an "execution.status" attribute containing the exit code for your script, so you can return > 0 for error condition(s) and use RouteOnAttribute to send the flow files down various paths for error-handling and what-not.

4) Alternatively, rather than going back out to the file system, if you can (in your script) write the contents of the file to STDOUT instead of a file, then you can route ExecuteStreamCommand directly to PutHDFS. This keeps the file contents "under management" by NiFi, which is great for provenance/lineage, since you will be able to use NiFi to see what the contents look like at each step of the flow.

If (as you described) you want to write to an output file, then after the script is finished writing the file, you can write out the path to the file to STDOUT and choose an Output Destination Attribute of something like "absolute.path.to.file" in ExecuteStreamCommand. Then you can route to FetchFile and set the File To Fetch to "${absolute.path.to.file}", then a PutHDFS processor to land the file in Hadoop.

Just for completeness (this won't apply to you as you're using numpy which is a CPython module): if you are using "pure" Python modules (meaning their code and dependencies are all Python and do not use natively compiled modules -- no CPython or .so/.dll, etc. -- you can use ExecuteScript or InvokeScriptedProcessor. This uses a Jython (not Python) engine to execute your script. These processors give you quite a bit more flexibility (albeit some more boilerplate code) as you can work with flow file attributes, content, controller services, and other NiFi features while inside your Python code. However the tradeoff (as mentioned) is needing pure Python modules/scripts.

avatar
Explorer

Thanks for this in-depth answer, it resolved all my questions. Loving this community so far!

For anyone running into the same problem, I just stumbled upon the following link which explains the use of ExecuteStreamCommand with examples: https://pierrevillard.com/2016/03/09/transform-data-with-apache-nifi/

avatar
New Contributor

I also found this and other answers from Matt very useful. Thank you.

I have a legacy Python script collection that processes binary files along a work-flow that I would like to speed up considerably, and I am curious about the possibility of doing this on a compute cluster with NiFi. I would like to use these scripts to extract data from the binary files and then ingest the data into a suitable database such as Spark SQL or Hive for machine learning.

Right now the ExecuteScript processor looks the most suitable for using my Python scripts in, and after some time working with it I am still having various challenges in getting things to work.

Some particulars:

  1. My scripts use Python2
  2. I installed the Python libraries into the HWX HDP sandbox environment using Anaconda (/usr/local/anaconda2/)
  3. I normally run my code from within virtualenvironment, so that I might maintain Python dependencies for my project without affecting that needed for the Operating System.
  4. I need to import various Python modules for my scripts to work, and am not sure if all of those I need would be pure Python. Here is a list of things I typically import: commands, database, datetime, java.io, java.nio.charset, math, multiprocessing, mysql.connector, MySQLdb, numpy, org.apache.commons.io, org.apache.nifi.processor.io, os, pstats, scipy, sys, time, and there might be more coming.

My Questions:

  1. Is there an additional overhead/speed penalty incurred (and how much) when running Python scripts in the ExecuteScript processor (as opposed to running natively outside NiFi) through the Jython engine?
  2. Are there any plans to have a non-Jython engine to execute the Python scripts, or any other workarounds for cases where non-pure Python modules might need to be imported?
  3. How could I let my Python scripts inside ExecuteScript use the correct version of Python?

Thank you!

avatar
Contributor

Hi @Alexander Polson, did you end up finding answers to your questions?

avatar
Explorer

Hi Matt,

I am trying to convert a series of XML files to csv files and save them on hdfs. I wrote a script that does the conversion and now I would like to use ExecutingStreamCommand processor. I tried to simulate your explanation of using ListFile->ExecutingStreamCommand->FetchFile->PutHDFS. It works well however, the name of the FlowFile is not updating after using the FetchFile processor, which I know that FetchFile was never design to change the FlowFile. So, the converted files that are saved on hdfs are now .xml, instead of .csv. Now my question is how can I update the name of the FlowFile? Should I add an other ExecutingStreamCommand for renaming? Thanks

avatar
Master Guru

After ExecuteStreamCommand, you'll want an UpdateAttribute processor to set "filename" to "${filename}.csv", or a slightly more complicated expression if you are trying to replace the .xml extension with .csv rather than just appending .csv