Created 03-12-2018 12:21 PM
I am trying to create a Python script in NiFi that:
What i ve done so far:
import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback,InputStreamCallback, OutputStreamCallback
class OutputWrite(OutputStreamCallback, obj):
def __init__(self):
self.obj = obj
def process(self, outputStream):
outputStream.write(bytearray(json.dumps(self.obj).encode('utf')))
###end class###
flowfile = session.get()
if flowfile != None:
**#1) Get flowfile attributes**
headers = {
'Accept-Encoding': 'gzip, deflate, br',
'Accept': 'application/json, text/plain, */*',
'Cache-Control': 'no-cache',
'Ocp-Apim-Trace': 'true',
'Authorization': flowfile.getAttribute('Authorization')
}
collection = flowfile.getAttribute('collection')
dataset = flowfile.getAttribute('dataset')
**#2)Get flowfile content**
stream_content = session.read(flowfile)
text_content = IOUtils.toString(stream_content, StandardCharsets.UTF_8)
json_content = json.loads(text_content)
records = json_content['result']['count']
pages = records/10000
**#3) Write flowfile attributes**
flowfile = session.putAttribute(flowfile, 'collection', collection)
flowfile = session.putAttribute(flowfile, 'dataset', dataset)
**#API operations: output_json with desired data**
output_json = {some data}
**#4) Write final JSON data to output flowfile**
flowfile = session.write(flowfile, OutputWrite(output_json))
session.transfer(flowfile, REL_SUCCESS)
session.commit()My problem is that i can't find a way to pass a reference to the desired output_json object as an argument in the OutputStreamCallback class. Any ideas on how to resolve this or maybe a better approach? Is it maybe easier to perform all API operations in this case within the process function of the class, but then how do i get access to the incoming flowfile attributes within the process function (requires a session or a flowfile object) ? Any help much appreciated!
Created 03-13-2018 12:04 PM
Solved, much simpler than i thought. You just need to pass the flowfile to the initialization function of the CallBack class.
Created 03-13-2018 11:09 AM
Seems i could do all API operations within the process function of the CallBack class, but in this case i need access to the flowfile attributes besides the content within the class scope. Tried to pass a flowfile reference to the class definition, but failed. Any ideas?
Created 03-13-2018 12:04 PM
Solved, much simpler than i thought. You just need to pass the flowfile to the initialization function of the CallBack class.
Created on 08-29-2023 08:39 AM - edited 08-29-2023 08:39 AM
Can you help me with a similar issue?
My code:
Created 08-21-2018 01:53 AM
Hi Balalaika, I dont understand the answer you have posted. Can you please explain a bit more? Also from where does the obj comes in the OutputWrite class?
Created 08-21-2018 11:31 AM
If you have not already, please review this 3 part series on ExecuteScript. It was very helpful for me when I was working with my custom script and ExecuteScript Processor.
https://community.hortonworks.com/content/kbentry/75032/executescript-cookbook-part-1.html
Created 08-23-2018 04:42 PM