Support Questions

Find answers, ask questions, and share your expertise

Can anyone provide an example of a python script executed through ExecuteStreamCommand?

avatar
Contributor

Albeit obvious, the processor should have the following properties:

  • Calls a python script
  • Able to supply the FlowFile in to the python script
  • Read the FlowFile from within the python script
  • Update either the original FlowFile or create a new FlowFile from within the python script
  • Output the updated/new FlowFile back in to Nifi

Original question (without any responses)

Any pointers/advice/help is appreciated

1 ACCEPTED SOLUTION

avatar

@Vincent van Oudenhoven

Here is a very elementary flow to depict it using ExecuteStreamCommand processor.

The flow looks like

64862-screen-shot-2018-03-27-at-15009-am.png

In the GenerateFlowFile processor, I am generating a flow file with sample text "foobar"

64863-screen-shot-2018-03-27-at-15034-am.png

In the ExecuteStreamCommand, I am referring to my python code as

64864-screen-shot-2018-03-27-at-15105-am.png

The sample.py looks like as silly as

64865-screen-shot-2018-03-27-at-15143-am.png

And now the content of the flow file looks like

64866-screen-shot-2018-03-27-at-15128-am.png

However, if you want to access the content of the existing flow file, I guess the only way you can do it is by converting the content to attribute and this can have consequences since attributes are kept in memory and a very large value for an attribute or a lot of attributes can adversely affect the performance.


Let know if that helps!

View solution in original post

6 REPLIES 6

avatar
@Vincent van Oudenhoven

Any specific reason for using ExecuteStream Command for this use case of yours? I will recommend using ExecuteScript or InvokeScript Processor and you can perform all the aforementioned operations from your question!

For example, a very beginners example can be the following script which reads a file and create an empty file with all its attributes using ExecuteScript Processor.

flowFile = session.get()
attrMap = flowFile.getAttributes()
session.remove(flowFile)
newflowFile = session.create()
newflowFile = session.putAllAttributes(newflowFile, attrMap)
session.transfer(newflowFile, REL_SUCCESS)

Or this groovy script in an ExecuteScript processor which can read the content of your flow files and accordingly redirect them to downstream connections.

import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets
flowFile = session.get()
if(!flowFile)return
def text = ''
def storeID = 0
session.read(flowFile, {inputStream ->
  text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
  storeID = text.tokenize("|")[2]
} as InputStreamCallback)
if(storeID >=1 && storeID <= 10)
	session.transfer(flowFile, REL_SUCCESS)
else (storeID >10 && storeID <= 20)
	session.transfer(flowFile, REL_FAILURE)

You can have an external script executed using ExecuteStream command also but why maintain any code outside when the inbuilt flow file handling logic in a processor like ExecuteScript processor can help you achieve the functionality more easily.

avatar
Contributor

@Rahul Soni

Thank you for your reply. The reason that I want to use ExecuteStreamCommand instead of ExecuteScript is because Jython is not an option for me. I am running a wide array of python commands and they need to be executed under a particular Anaconda environment.

I can't seem to find any solid examples of ExecuteStreamCommand, would you mind providing an example or pointing me in the right direction?

Edit: just to add, ExecuteProcess is also not an option as it does not allow for an incoming FlowFile.

avatar

@Vincent van Oudenhoven

Here is a very elementary flow to depict it using ExecuteStreamCommand processor.

The flow looks like

64862-screen-shot-2018-03-27-at-15009-am.png

In the GenerateFlowFile processor, I am generating a flow file with sample text "foobar"

64863-screen-shot-2018-03-27-at-15034-am.png

In the ExecuteStreamCommand, I am referring to my python code as

64864-screen-shot-2018-03-27-at-15105-am.png

The sample.py looks like as silly as

64865-screen-shot-2018-03-27-at-15143-am.png

And now the content of the flow file looks like

64866-screen-shot-2018-03-27-at-15128-am.png

However, if you want to access the content of the existing flow file, I guess the only way you can do it is by converting the content to attribute and this can have consequences since attributes are kept in memory and a very large value for an attribute or a lot of attributes can adversely affect the performance.


Let know if that helps!

avatar

@Vincent van Oudenhoven Does that help?

avatar
@Vincent van Oudenhoven

Did the answer help in the resolution of your query? Please close the thread by marking the answer as Accepted!

avatar
Contributor

It did indeed help. I found the following StackOverflow answer to help too: https://stackoverflow.com/questions/49467969/python-script-using-executestreamcommand

Especially:

Command Arguments: any flags or args, delimited by ; (i.e. /path/to/my_script.py)

Command Path: /path/to/python3

Note the Command Path that you did not specify in the processor. This also allows the use of for example a predefined Anaconda environment.

Anyhow, thank you for the help!