Support Questions

Find answers, ask questions, and share your expertise

Python in ExecuteScript: Transform both flowfile attributes & content

avatar
Expert Contributor

I am trying to create a Python script in NiFi that:

  1. Reads some attributes from an incoming flowfile
  2. Read the json content of the flowfile & extract specific fields
  3. Write attributes to outgoing flowfile
  4. Overwrite incoming flowfile with new content that is created in the script (e.g. API call that returns new json) and send it to SUCCESS relationship OR remove the old flowfile and create new with desired content

What i ve done so far:

import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback,InputStreamCallback, OutputStreamCallback

class OutputWrite(OutputStreamCallback, obj):

def __init__(self):
    self.obj = obj

def process(self, outputStream):

    outputStream.write(bytearray(json.dumps(self.obj).encode('utf')))

###end class###

flowfile = session.get()

if flowfile != None:

**#1) Get flowfile attributes**

    headers = {
        'Accept-Encoding': 'gzip, deflate, br',
        'Accept': 'application/json, text/plain, */*',
        'Cache-Control': 'no-cache',
        'Ocp-Apim-Trace': 'true',
        'Authorization': flowfile.getAttribute('Authorization')
    }

    collection = flowfile.getAttribute('collection')
    dataset = flowfile.getAttribute('dataset')

    **#2)Get flowfile content**

    stream_content = session.read(flowfile)
    text_content = IOUtils.toString(stream_content, StandardCharsets.UTF_8)
    json_content = json.loads(text_content)

    records = json_content['result']['count']
    pages = records/10000

    **#3) Write flowfile attributes**

    flowfile = session.putAttribute(flowfile, 'collection', collection)
    flowfile = session.putAttribute(flowfile, 'dataset', dataset)

    **#API operations: output_json with desired data**

    output_json = {some data}

    **#4) Write final JSON data to output flowfile**

    flowfile = session.write(flowfile, OutputWrite(output_json))

    session.transfer(flowfile, REL_SUCCESS)
    session.commit()

My problem is that i can't find a way to pass a reference to the desired output_json object as an argument in the OutputStreamCallback class. Any ideas on how to resolve this or maybe a better approach? Is it maybe easier to perform all API operations in this case within the process function of the class, but then how do i get access to the incoming flowfile attributes within the process function (requires a session or a flowfile object) ? Any help much appreciated!

1 ACCEPTED SOLUTION

avatar
Expert Contributor

Solved, much simpler than i thought. You just need to pass the flowfile to the initialization function of the CallBack class.

View solution in original post

6 REPLIES 6

avatar
Expert Contributor

@Matt Burgess

Seems i could do all API operations within the process function of the CallBack class, but in this case i need access to the flowfile attributes besides the content within the class scope. Tried to pass a flowfile reference to the class definition, but failed. Any ideas?

avatar
Expert Contributor

Solved, much simpler than i thought. You just need to pass the flowfile to the initialization function of the CallBack class.

avatar
Rising Star

Can you help me with a similar issue?

My code:

import json
import collections
import java.io
import ast  
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback

class ModJSON(StreamCallback) :
  def __init__(self): 
        pass
  def process(self, inputStream, outputStream):
    text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
    obj = ast.literal_eval(text)
    mail = obj['mail']
    outputStream.write(bytearray(json.dumps(obj['mail'], indent=4).encode('utf-8')))
    outputStream.write(bytearray(json.dumps(obj['id'], indent=4).encode('utf-8')))
     

flowFile = session.get()
if (flowFile != None):
  flowFile = session.write(flowFile, ModJSON())
  flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename').split('.')[0]+'_translated.json')
  flowFile = session.putAttribute(flowFile, "mail", mail)
session.transfer(flowFile, REL_SUCCESS)
session.commit()
 
 
I want to get "mail" value and create a new attribute for it. However, mail variable is not accessible when I insert it into the putattribute command. Any feedback?

avatar
Contributor

Hi Balalaika, I dont understand the answer you have posted. Can you please explain a bit more? Also from where does the obj comes in the OutputWrite class?

avatar
Super Guru
@Bharath Sudharsanam

If you have not already, please review this 3 part series on ExecuteScript. It was very helpful for me when I was working with my custom script and ExecuteScript Processor.

https://community.hortonworks.com/content/kbentry/75032/executescript-cookbook-part-1.html

avatar