Created on 05-25-2016 04:30 PM - edited 08-29-2019 09:57 AM
In NiFi the data being passed between operators is referred to as a FlowFile and can be accessed via various scripting languages in the ExecuteScript operator.
In order to access the data in the FlowFile you need to understand a few requirements first.
In this example we will access the json data being passed into the ExecuteScript operator via a getTwitter operator.
This could be any data ingestion operator.
Using the ExecuteScript (Note: Execute script will use Jython libraries which are good but limited. If you have a python script which uses other libs and produces an output you can use Execute Process instead which will execute the python script on the machine using the full python lib and the output will become your flow file. ) operator set the script type to python and in the "script body" section enter the following code:
import json import java.io from org.apache.commons.io import IOUtils from java.nio.charset import StandardCharsets from org.apache.nifi.processor.io import StreamCallback class ModJSON(StreamCallback): def __init__(self): pass def process(self, inputStream, outputStream): text = IOUtils.toString(inputStream, StandardCharsets.UTF_8) obj = json.loads(text) newObj = { "Source": "NiFi", "ID": obj['id'], "Name": obj['user']['screen_name'] } outputStream.write(bytearray(json.dumps(newObj, indent=4).encode('utf-8'))) flowFile = session.get() if (flowFile != None): flowFile = session.write(flowFile, ModJSON()) flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename').split('.')[0]+'_translated.json') session.transfer(flowFile, REL_SUCCESS) session.commit()
the import statements are required to take advantage of the NiFi components.
The way we actually access the flowfile is through a global variable available to you in NiFi called "session".
As you can see:
flowFile = session.get()
is how we grab the IOStream from NiFi and the pass to the processing Class via the session.write() method.
You can build up a new JSON object while referencing attributes from the source JSON as you can see in the script:
newObj = { "Source": "NiFi", "ID": obj['id'], "Name": obj['user']['screen_name'] }
This should help you get started with using python scripts inside of NiFi.
I hope to see some posts of how you modify this to create more interesting flows.
Created on 06-13-2019 10:34 AM
Thank you @Vasilis Vagias
Created on 03-11-2022 04:08 AM
hi, this isn't working as it is on nifi 1.14, can you give me a hand please?
i used a "generateFlowfile" with some random text, and connected to executeScript but get the following:
ExecuteScript[id=78c5739f-017f-1000-0000-0000016ca301] ExecuteScript[id=78c5739f-017f-1000-0000-0000016ca301] failed to process due to javax.script.ScriptException: java.lang.NullPointerException: java.lang.NullPointerException in <script> at line number 25; rolling back session: java.lang.NullPointerException ↳ causes: Traceback (most recent call last): File "<script>", line 25, in <module> java.lang.NullPointerException java.lang.NullPointerException: java.lang.NullPointerException ↳ causes: javax.script.ScriptException: java.lang.NullPointerException: java.lang.NullPointerException in <script> at line number 25 ↳ causes: org.apache.nifi.processor.exception.ProcessException: javax.script.ScriptException: java.lang.NullPointerException: java.lang.NullPointerException in <script> at line number 25