Community Articles

Find and share helpful community-sourced technical articles.
Labels (1)
avatar
Guru

In NiFi the data being passed between operators is referred to as a FlowFile and can be accessed via various scripting languages in the ExecuteScript operator.

In order to access the data in the FlowFile you need to understand a few requirements first.

In this example we will access the json data being passed into the ExecuteScript operator via a getTwitter operator.

This could be any data ingestion operator.

4528-screen-shot-2016-05-25-at-103502-am.png

Using the ExecuteScript (Note: Execute script will use Jython libraries which are good but limited. If you have a python script which uses other libs and produces an output you can use Execute Process instead which will execute the python script on the machine using the full python lib and the output will become your flow file. )  operator set the script type to python and in the "script body" section enter the following code:

4530-screen-shot-2016-05-25-at-104804-am.png

import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback

class ModJSON(StreamCallback):
  def __init__(self):
        pass
  def process(self, inputStream, outputStream):
    text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
    obj = json.loads(text)
    newObj = {
          "Source": "NiFi",
          "ID": obj['id'],
          "Name": obj['user']['screen_name']
        }
    outputStream.write(bytearray(json.dumps(newObj, indent=4).encode('utf-8')))

flowFile = session.get()
if (flowFile != None):
  flowFile = session.write(flowFile, ModJSON())
  flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename').split('.')[0]+'_translated.json')
session.transfer(flowFile, REL_SUCCESS)
session.commit()

the import statements are required to take advantage of the NiFi components.

The way we actually access the flowfile is through a global variable available to you in NiFi called "session".

As you can see:

flowFile = session.get()

is how we grab the IOStream from NiFi and the pass to the processing Class via the session.write() method.

You can build up a new JSON object while referencing attributes from the source JSON as you can see in the script:

    newObj = {
          "Source": "NiFi",
          "ID": obj['id'],
          "Name": obj['user']['screen_name']
        }

This should help you get started with using python scripts inside of NiFi.

I hope to see some posts of how you modify this to create more interesting flows.

107,027 Views
Comments
avatar
Contributor

Hi,

Thank you very much for the tutorial. I am new to Nifi and trying out some use cases. I followed your steps to write a Jython script where I read from an xml file(rss) and then convert it into a string and write to outputStream and routed to putFile. The problem I am facing is that the data is getting queued in the connection to ExecuteScript and not getting into the processor. I am posting the code I wrote here in the ExecuteScript Script body. Did I need to point it to a module directory?

import xml.etree.ElementTree as ET
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
class xmlParser(StreamCallback):
 def __init__(self):
  pass
 def process(self,inputStream,outputStream):
  text= IOUtils.ToString(inputStream,StandardCharsets.UTF_8)
  xmlRoot = ET.fromstring(text);
  extracted_list=[]
  for elmnts in xmlRoot.fromall('item'):
   title= elmnts.find("title").text
   description = elmnts.find("description").text
   extracted_list.append(title)
   extracted_list.append(description)
  str_extract = ''.join(extracted_list)
  outputStream.write(bytearray(str_extract.encode('utf-8')))
  
  flowFile = session.get()
  if(flowFile!=None):
   flowFile = session.write(flowFile,xmlParser())
   flowFile = session.putAtrribute(flowFile, 'filename', 'rss_feed.xml')
  session.transfer(flowFile, REL_SUCCESS)
  session.commit()
  

If you can help me with this, it would be great.

6679-nifi-capture.png

avatar
Guru

That is strange. I see that you loop the failures back into the script processor. Can you delete that and route failure and success to the PutFile processor?

avatar
Contributor

I did as you said but to no avail. It's the same. And, there's nothing populating the bulletin board for me to debug.

avatar

Hi

We are experiencing trouble in flowFile = session.write(flowFile, ModJSON()) that line.

javax.script.ScriptException: TypeError: write(): 1st arg can't be coerced to int , byte[] in <script> at line number 64

Do you think what is the problem:

import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback




class ModJSON(StreamCallback):


    def __init__(self):
        pass


    def process(self, inputStream, outputStream):
        text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
        obj = json.loads(text)
        endorsements = obj['endorsement'].split(',')
        categoryIds = obj['categoryIds'].split(',')
        orderItemIds = obj['orderItemIds'].split(',')
        seq_num = 0
        seq_num1 = 1
        for endorsement in endorsements:
            if seq_num == 0 and seq_num1 == len(endorsements):
                newObj = '[{"endorsement":' + endorsement \
                    + ',"eventId":"' + obj['eventId'] \
                    + '", "categoryId":"' + categoryIds[seq_num] \
                    + '", "createDate":"' + obj['createDate'] \
                    + '", "buyerId":"' + obj['buyerId'] \
                    + '", "channel":"' + obj['channel'] + '", "city":"' \
                    + obj['city'] + '", "orderItemId": "'+orderItemIds[seq_num]+'"}]'
                seq_num += 1
                seq_num1 += 1
            elif seq_num == 0:
                newObj = '[{"endorsement":' + endorsement + ',"eventId":"' \
                    + obj['eventId'] + '", "categoryId":"' \
                    + categoryIds[seq_num] + '", "createDate":"' \
                    + obj['createDate'] + '", "buyerId":"' \
                    + obj['buyerId'] + '", "channel":"' + obj['channel'
                        ] + '", "city":"' + obj['city'] + '", "orderItemId": "'+orderItemIds[seq_num]+'"},'
                seq_num += 1
                seq_num1 += 1
            elif seq_num1 == len(endorsements) :
                newObj += '{"endorsement":' + endorsement + ',"eventId":"' \
                    + obj['eventId'] + '", "categoryId":"' \
                    + categoryIds[seq_num] + '", "createDate":"' \
                    + obj['createDate'] + '", "buyerId":"' \
                    + obj['buyerId'] + '", "channel":"' + obj['channel'
                        ] + '", "city":"' + obj['city'] + '", "orderItemId": "'+orderItemIds[seq_num]+'"}]'
                seq_num += 1
                seq_num1 += 1
            else:
                newObj += '{"endorsement":' + endorsement + ',"eventId":"' \
                    + obj['eventId'] + '", "categoryId":"' \
                    + categoryIds[seq_num] + '", "createDate":"' \
                    + obj['createDate'] + '", "buyerId":"' \
                    + obj['buyerId'] + '", "channel":"' + obj['channel'
                        ] + '", "city":"' + obj['city'] + '", "orderItemId": "'+orderItemIds[seq_num]+'"},'
                seq_num += 1
                seq_num1 += 1
        outputStream.write(bytearray(newObj.encode('utf-8')))




flowFile = session.get()
if flowFile != None:
    flowFile = session.write(flowFile, ModJSON())
    flowFile = session.putAttribute(flowFile, 'filename',
                                    flowFile.getAttribute('filename'
                                    ).split('.')[0] + '_translated.json'
                                    )
session.transfer(flowFile, REL_SUCCESS)
session.commit()

avatar
Guru

It looks like flowFile.get() is in a method in a class All that script does is define a class and exit. Gotta move the indents so the flowFile.get() block is outside the class definition (so no indent at all). That might fix the issue.

avatar
Contributor

Hi,

My flowFile seems to be null although its being generated in a previous GetFile processor. Creating a script like yours logs me a NullPointerException error in the session.transfer line as flowFile appears to be null. I modified the script to log and in the logs I'm seeing that the flowfile seems to be null. What workaround can I make?

flowFile = session.get()
if (flowFile != None):
	log.info('is not null') # This is not being logged
	flowFile = session.write(flowFile, ModJSON())
else:
	log.info('is null') # This is being logged

session.transfer(flowFile, REL_SUCCESS) # NullPointerException thrown here
session.commit()
avatar
New Contributor

In the example above, ExecuteScript Processor is showing some error and write is zero bytes from it.

GetTwitter (In: 0/0 bytes)

ExecuteScript (In: 0/0 bytes)

PutFile (In : 738 /59.59 KB) How ?

Is this example working.

avatar
Contributor

Hi @Vasilis Vagias

Great article.

I am scraping facebook page contents using python and want to used executescript processor to get all the posts returned by python function and pass it on to solr processor for indexing. Currently I am writing the contents returned by facebook in a file and I want to put those contents to the output stream instead and pass on to next processsor.Can you please share the steps with regards to the example given in your article?

Can I used outputstream object in any python function and use it for writing records; I don't think creating inner class is mandatory. Also, does it allow writerow () kind of functionality ?

Appreciate showing me the way here. Thanks.

avatar
@Vasilis Vagias

I am trying to create a data flow with nifi using python scripts. To begin the process I used ExecuteProcess processor and to execute subsequent scripts I am trying to use ExecuteScript processor.as I need to build dependency between my flow. I want the subsequent job to run only if the dependent job executes successfully.

However, I see that the 'ExecuteProcess' jobs runs successfully but the subsequent 'ExecuteScript' jobs does not starts. I also see the relationship queue is empty. How can I make the flow run successfully.

Here are the screenshots for your reference:

60486-capture.png

60487-capture.png

avatar

Hello @Vasilis Vagias, some other tutorials state that "NOTE: ExecuteScript will perform a session.commit() at the end of each execution to ensure the operations have been committed. You do not need to (and should not) perform a session.commit() within the script." (Source:Link )

Do we really need to use commit statement?