Support Questions

Find answers, ask questions, and share your expertise
Announcements
Check out our newest addition to the community, the Cloudera Data Analytics (CDA) group hub.

Python in ExecuteScript: Transform both flowfile attributes & content

Rising Star

I am trying to create a Python script in NiFi that:

  1. Reads some attributes from an incoming flowfile
  2. Read the json content of the flowfile & extract specific fields
  3. Write attributes to outgoing flowfile
  4. Overwrite incoming flowfile with new content that is created in the script (e.g. API call that returns new json) and send it to SUCCESS relationship OR remove the old flowfile and create new with desired content

What i ve done so far:

import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback,InputStreamCallback, OutputStreamCallback

class OutputWrite(OutputStreamCallback, obj):

def __init__(self):
    self.obj = obj

def process(self, outputStream):

    outputStream.write(bytearray(json.dumps(self.obj).encode('utf')))

###end class###

flowfile = session.get()

if flowfile != None:

**#1) Get flowfile attributes**

    headers = {
        'Accept-Encoding': 'gzip, deflate, br',
        'Accept': 'application/json, text/plain, */*',
        'Cache-Control': 'no-cache',
        'Ocp-Apim-Trace': 'true',
        'Authorization': flowfile.getAttribute('Authorization')
    }

    collection = flowfile.getAttribute('collection')
    dataset = flowfile.getAttribute('dataset')

    **#2)Get flowfile content**

    stream_content = session.read(flowfile)
    text_content = IOUtils.toString(stream_content, StandardCharsets.UTF_8)
    json_content = json.loads(text_content)

    records = json_content['result']['count']
    pages = records/10000

    **#3) Write flowfile attributes**

    flowfile = session.putAttribute(flowfile, 'collection', collection)
    flowfile = session.putAttribute(flowfile, 'dataset', dataset)

    **#API operations: output_json with desired data**

    output_json = {some data}

    **#4) Write final JSON data to output flowfile**

    flowfile = session.write(flowfile, OutputWrite(output_json))

    session.transfer(flowfile, REL_SUCCESS)
    session.commit()

My problem is that i can't find a way to pass a reference to the desired output_json object as an argument in the OutputStreamCallback class. Any ideas on how to resolve this or maybe a better approach? Is it maybe easier to perform all API operations in this case within the process function of the class, but then how do i get access to the incoming flowfile attributes within the process function (requires a session or a flowfile object) ? Any help much appreciated!

1 ACCEPTED SOLUTION

Rising Star

Solved, much simpler than i thought. You just need to pass the flowfile to the initialization function of the CallBack class.

View solution in original post

5 REPLIES 5

Rising Star

@Matt Burgess

Seems i could do all API operations within the process function of the CallBack class, but in this case i need access to the flowfile attributes besides the content within the class scope. Tried to pass a flowfile reference to the class definition, but failed. Any ideas?

Rising Star

Solved, much simpler than i thought. You just need to pass the flowfile to the initialization function of the CallBack class.

Hi Balalaika, I dont understand the answer you have posted. Can you please explain a bit more? Also from where does the obj comes in the OutputWrite class?

@Bharath Sudharsanam

If you have not already, please review this 3 part series on ExecuteScript. It was very helpful for me when I was working with my custom script and ExecuteScript Processor.

https://community.hortonworks.com/content/kbentry/75032/executescript-cookbook-part-1.html

Take a Tour of the Community
Don't have an account?
Your experience may be limited. Sign in to explore more.