Support Questions

Find answers, ask questions, and share your expertise
Announcements
Check out our newest addition to the community, the Cloudera Data Analytics (CDA) group hub.

Can anyone provide an example of a python script executed through ExecuteStreamCommand?

Albeit obvious, the processor should have the following properties:

  • Calls a python script
  • Able to supply the FlowFile in to the python script
  • Read the FlowFile from within the python script
  • Update either the original FlowFile or create a new FlowFile from within the python script
  • Output the updated/new FlowFile back in to Nifi

Original question (without any responses)

Any pointers/advice/help is appreciated

1 ACCEPTED SOLUTION

@Vincent van Oudenhoven

Here is a very elementary flow to depict it using ExecuteStreamCommand processor.

The flow looks like

64862-screen-shot-2018-03-27-at-15009-am.png

In the GenerateFlowFile processor, I am generating a flow file with sample text "foobar"

64863-screen-shot-2018-03-27-at-15034-am.png

In the ExecuteStreamCommand, I am referring to my python code as

64864-screen-shot-2018-03-27-at-15105-am.png

The sample.py looks like as silly as

64865-screen-shot-2018-03-27-at-15143-am.png

And now the content of the flow file looks like

64866-screen-shot-2018-03-27-at-15128-am.png

However, if you want to access the content of the existing flow file, I guess the only way you can do it is by converting the content to attribute and this can have consequences since attributes are kept in memory and a very large value for an attribute or a lot of attributes can adversely affect the performance.


Let know if that helps!

View solution in original post

6 REPLIES 6

@Vincent van Oudenhoven

Any specific reason for using ExecuteStream Command for this use case of yours? I will recommend using ExecuteScript or InvokeScript Processor and you can perform all the aforementioned operations from your question!

For example, a very beginners example can be the following script which reads a file and create an empty file with all its attributes using ExecuteScript Processor.

flowFile = session.get()
attrMap = flowFile.getAttributes()
session.remove(flowFile)
newflowFile = session.create()
newflowFile = session.putAllAttributes(newflowFile, attrMap)
session.transfer(newflowFile, REL_SUCCESS)

Or this groovy script in an ExecuteScript processor which can read the content of your flow files and accordingly redirect them to downstream connections.

import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets
flowFile = session.get()
if(!flowFile)return
def text = ''
def storeID = 0
session.read(flowFile, {inputStream ->
  text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
  storeID = text.tokenize("|")[2]
} as InputStreamCallback)
if(storeID >=1 && storeID <= 10)
	session.transfer(flowFile, REL_SUCCESS)
else (storeID >10 && storeID <= 20)
	session.transfer(flowFile, REL_FAILURE)

You can have an external script executed using ExecuteStream command also but why maintain any code outside when the inbuilt flow file handling logic in a processor like ExecuteScript processor can help you achieve the functionality more easily.

@Rahul Soni

Thank you for your reply. The reason that I want to use ExecuteStreamCommand instead of ExecuteScript is because Jython is not an option for me. I am running a wide array of python commands and they need to be executed under a particular Anaconda environment.

I can't seem to find any solid examples of ExecuteStreamCommand, would you mind providing an example or pointing me in the right direction?

Edit: just to add, ExecuteProcess is also not an option as it does not allow for an incoming FlowFile.

@Vincent van Oudenhoven

Here is a very elementary flow to depict it using ExecuteStreamCommand processor.

The flow looks like

64862-screen-shot-2018-03-27-at-15009-am.png

In the GenerateFlowFile processor, I am generating a flow file with sample text "foobar"

64863-screen-shot-2018-03-27-at-15034-am.png

In the ExecuteStreamCommand, I am referring to my python code as

64864-screen-shot-2018-03-27-at-15105-am.png

The sample.py looks like as silly as

64865-screen-shot-2018-03-27-at-15143-am.png

And now the content of the flow file looks like

64866-screen-shot-2018-03-27-at-15128-am.png

However, if you want to access the content of the existing flow file, I guess the only way you can do it is by converting the content to attribute and this can have consequences since attributes are kept in memory and a very large value for an attribute or a lot of attributes can adversely affect the performance.


Let know if that helps!

@Vincent van Oudenhoven Does that help?

@Vincent van Oudenhoven

Did the answer help in the resolution of your query? Please close the thread by marking the answer as Accepted!

It did indeed help. I found the following StackOverflow answer to help too: https://stackoverflow.com/questions/49467969/python-script-using-executestreamcommand

Especially:

Command Arguments: any flags or args, delimited by ; (i.e. /path/to/my_script.py)

Command Path: /path/to/python3

Note the Command Path that you did not specify in the processor. This also allows the use of for example a predefined Anaconda environment.

Anyhow, thank you for the help!

Take a Tour of the Community
Don't have an account?
Your experience may be limited. Sign in to explore more.