Created on 05-25-2016 04:30 PM - edited 08-29-2019 09:57 AM
In NiFi the data being passed between operators is referred to as a FlowFile and can be accessed via various scripting languages in the ExecuteScript operator.
In order to access the data in the FlowFile you need to understand a few requirements first.
In this example we will access the json data being passed into the ExecuteScript operator via a getTwitter operator.
This could be any data ingestion operator.
Using the ExecuteScript (Note: Execute script will use Jython libraries which are good but limited. If you have a python script which uses other libs and produces an output you can use Execute Process instead which will execute the python script on the machine using the full python lib and the output will become your flow file. ) operator set the script type to python and in the "script body" section enter the following code:
import json import java.io from org.apache.commons.io import IOUtils from java.nio.charset import StandardCharsets from org.apache.nifi.processor.io import StreamCallback class ModJSON(StreamCallback): def __init__(self): pass def process(self, inputStream, outputStream): text = IOUtils.toString(inputStream, StandardCharsets.UTF_8) obj = json.loads(text) newObj = { "Source": "NiFi", "ID": obj['id'], "Name": obj['user']['screen_name'] } outputStream.write(bytearray(json.dumps(newObj, indent=4).encode('utf-8'))) flowFile = session.get() if (flowFile != None): flowFile = session.write(flowFile, ModJSON()) flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename').split('.')[0]+'_translated.json') session.transfer(flowFile, REL_SUCCESS) session.commit()
the import statements are required to take advantage of the NiFi components.
The way we actually access the flowfile is through a global variable available to you in NiFi called "session".
As you can see:
flowFile = session.get()
is how we grab the IOStream from NiFi and the pass to the processing Class via the session.write() method.
You can build up a new JSON object while referencing attributes from the source JSON as you can see in the script:
newObj = { "Source": "NiFi", "ID": obj['id'], "Name": obj['user']['screen_name'] }
This should help you get started with using python scripts inside of NiFi.
I hope to see some posts of how you modify this to create more interesting flows.
Created on 08-16-2016 07:26 PM - edited 08-17-2019 12:05 PM
Hi,
Thank you very much for the tutorial. I am new to Nifi and trying out some use cases. I followed your steps to write a Jython script where I read from an xml file(rss) and then convert it into a string and write to outputStream and routed to putFile. The problem I am facing is that the data is getting queued in the connection to ExecuteScript and not getting into the processor. I am posting the code I wrote here in the ExecuteScript Script body. Did I need to point it to a module directory?
import xml.etree.ElementTree as ET import java.io from org.apache.commons.io import IOUtils from java.nio.charset import StandardCharsets from org.apache.nifi.processor.io import StreamCallback class xmlParser(StreamCallback): def __init__(self): pass def process(self,inputStream,outputStream): text= IOUtils.ToString(inputStream,StandardCharsets.UTF_8) xmlRoot = ET.fromstring(text); extracted_list=[] for elmnts in xmlRoot.fromall('item'): title= elmnts.find("title").text description = elmnts.find("description").text extracted_list.append(title) extracted_list.append(description) str_extract = ''.join(extracted_list) outputStream.write(bytearray(str_extract.encode('utf-8'))) flowFile = session.get() if(flowFile!=None): flowFile = session.write(flowFile,xmlParser()) flowFile = session.putAtrribute(flowFile, 'filename', 'rss_feed.xml') session.transfer(flowFile, REL_SUCCESS) session.commit()
If you can help me with this, it would be great.
Created on 08-17-2016 07:28 PM
That is strange. I see that you loop the failures back into the script processor. Can you delete that and route failure and success to the PutFile processor?
Created on 08-17-2016 11:09 PM
I did as you said but to no avail. It's the same. And, there's nothing populating the bulletin board for me to debug.
Created on 08-19-2016 08:49 AM
Hi
We are experiencing trouble in flowFile = session.write(flowFile, ModJSON()) that line.
javax.script.ScriptException: TypeError: write(): 1st arg can't be coerced to int , byte[] in <script> at line number 64
Do you think what is the problem:
import json import java.io from org.apache.commons.io import IOUtils from java.nio.charset import StandardCharsets from org.apache.nifi.processor.io import StreamCallback class ModJSON(StreamCallback): def __init__(self): pass def process(self, inputStream, outputStream): text = IOUtils.toString(inputStream, StandardCharsets.UTF_8) obj = json.loads(text) endorsements = obj['endorsement'].split(',') categoryIds = obj['categoryIds'].split(',') orderItemIds = obj['orderItemIds'].split(',') seq_num = 0 seq_num1 = 1 for endorsement in endorsements: if seq_num == 0 and seq_num1 == len(endorsements): newObj = '[{"endorsement":' + endorsement \ + ',"eventId":"' + obj['eventId'] \ + '", "categoryId":"' + categoryIds[seq_num] \ + '", "createDate":"' + obj['createDate'] \ + '", "buyerId":"' + obj['buyerId'] \ + '", "channel":"' + obj['channel'] + '", "city":"' \ + obj['city'] + '", "orderItemId": "'+orderItemIds[seq_num]+'"}]' seq_num += 1 seq_num1 += 1 elif seq_num == 0: newObj = '[{"endorsement":' + endorsement + ',"eventId":"' \ + obj['eventId'] + '", "categoryId":"' \ + categoryIds[seq_num] + '", "createDate":"' \ + obj['createDate'] + '", "buyerId":"' \ + obj['buyerId'] + '", "channel":"' + obj['channel' ] + '", "city":"' + obj['city'] + '", "orderItemId": "'+orderItemIds[seq_num]+'"},' seq_num += 1 seq_num1 += 1 elif seq_num1 == len(endorsements) : newObj += '{"endorsement":' + endorsement + ',"eventId":"' \ + obj['eventId'] + '", "categoryId":"' \ + categoryIds[seq_num] + '", "createDate":"' \ + obj['createDate'] + '", "buyerId":"' \ + obj['buyerId'] + '", "channel":"' + obj['channel' ] + '", "city":"' + obj['city'] + '", "orderItemId": "'+orderItemIds[seq_num]+'"}]' seq_num += 1 seq_num1 += 1 else: newObj += '{"endorsement":' + endorsement + ',"eventId":"' \ + obj['eventId'] + '", "categoryId":"' \ + categoryIds[seq_num] + '", "createDate":"' \ + obj['createDate'] + '", "buyerId":"' \ + obj['buyerId'] + '", "channel":"' + obj['channel' ] + '", "city":"' + obj['city'] + '", "orderItemId": "'+orderItemIds[seq_num]+'"},' seq_num += 1 seq_num1 += 1 outputStream.write(bytearray(newObj.encode('utf-8'))) flowFile = session.get() if flowFile != None: flowFile = session.write(flowFile, ModJSON()) flowFile = session.putAttribute(flowFile, 'filename', flowFile.getAttribute('filename' ).split('.')[0] + '_translated.json' ) session.transfer(flowFile, REL_SUCCESS) session.commit()
Created on 08-22-2016 04:47 PM
It looks like flowFile.get() is in a method in a class All that script does is define a class and exit. Gotta move the indents so the flowFile.get() block is outside the class definition (so no indent at all). That might fix the issue.
Created on 09-01-2016 03:41 AM
Hi,
My flowFile seems to be null although its being generated in a previous GetFile processor. Creating a script like yours logs me a NullPointerException error in the session.transfer line as flowFile appears to be null. I modified the script to log and in the logs I'm seeing that the flowfile seems to be null. What workaround can I make?
flowFile = session.get() if (flowFile != None): log.info('is not null') # This is not being logged flowFile = session.write(flowFile, ModJSON()) else: log.info('is null') # This is being logged session.transfer(flowFile, REL_SUCCESS) # NullPointerException thrown here session.commit()
Created on 10-19-2016 02:26 AM
In the example above, ExecuteScript Processor is showing some error and write is zero bytes from it.
GetTwitter (In: 0/0 bytes)
ExecuteScript (In: 0/0 bytes)
PutFile (In : 738 /59.59 KB) How ?
Is this example working.
Created on 03-23-2017 09:30 AM
Great article.
I am scraping facebook page contents using python and want to used executescript processor to get all the posts returned by python function and pass it on to solr processor for indexing. Currently I am writing the contents returned by facebook in a file and I want to put those contents to the output stream instead and pass on to next processsor.Can you please share the steps with regards to the example given in your article?
Can I used outputstream object in any python function and use it for writing records; I don't think creating inner class is mandatory. Also, does it allow writerow () kind of functionality ?
Appreciate showing me the way here. Thanks.
Created on 02-09-2018 04:11 PM - edited 08-17-2019 12:04 PM
I am trying to create a data flow with nifi using python scripts. To begin the process I used ExecuteProcess processor and to execute subsequent scripts I am trying to use ExecuteScript processor.as I need to build dependency between my flow. I want the subsequent job to run only if the dependent job executes successfully.
However, I see that the 'ExecuteProcess' jobs runs successfully but the subsequent 'ExecuteScript' jobs does not starts. I also see the relationship queue is empty. How can I make the flow run successfully.
Here are the screenshots for your reference:
Created on 04-27-2019 11:23 AM
Hello @Vasilis Vagias, some other tutorials state that "NOTE: ExecuteScript will perform a session.commit() at the end of each execution to ensure the operations have been committed. You do not need to (and should not) perform a session.commit() within the script." (Source:Link )
Do we really need to use commit statement?