- Subscribe to RSS Feed
- Mark Question as New
- Mark Question as Read
- Float this Question for Current User
- Bookmark
- Subscribe
- Mute
- Printer Friendly Page
Python in ExecuteScript: Transform both flowfile attributes & content
- Labels:
-
Apache NiFi
Created 03-12-2018 12:21 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
I am trying to create a Python script in NiFi that:
- Reads some attributes from an incoming flowfile
- Read the json content of the flowfile & extract specific fields
- Write attributes to outgoing flowfile
- Overwrite incoming flowfile with new content that is created in the script (e.g. API call that returns new json) and send it to SUCCESS relationship OR remove the old flowfile and create new with desired content
What i ve done so far:
import json import java.io from org.apache.commons.io import IOUtils from java.nio.charset import StandardCharsets from org.apache.nifi.processor.io import StreamCallback,InputStreamCallback, OutputStreamCallback class OutputWrite(OutputStreamCallback, obj): def __init__(self): self.obj = obj def process(self, outputStream): outputStream.write(bytearray(json.dumps(self.obj).encode('utf'))) ###end class### flowfile = session.get() if flowfile != None: **#1) Get flowfile attributes** headers = { 'Accept-Encoding': 'gzip, deflate, br', 'Accept': 'application/json, text/plain, */*', 'Cache-Control': 'no-cache', 'Ocp-Apim-Trace': 'true', 'Authorization': flowfile.getAttribute('Authorization') } collection = flowfile.getAttribute('collection') dataset = flowfile.getAttribute('dataset') **#2)Get flowfile content** stream_content = session.read(flowfile) text_content = IOUtils.toString(stream_content, StandardCharsets.UTF_8) json_content = json.loads(text_content) records = json_content['result']['count'] pages = records/10000 **#3) Write flowfile attributes** flowfile = session.putAttribute(flowfile, 'collection', collection) flowfile = session.putAttribute(flowfile, 'dataset', dataset) **#API operations: output_json with desired data** output_json = {some data} **#4) Write final JSON data to output flowfile** flowfile = session.write(flowfile, OutputWrite(output_json)) session.transfer(flowfile, REL_SUCCESS) session.commit()
My problem is that i can't find a way to pass a reference to the desired output_json object as an argument in the OutputStreamCallback class. Any ideas on how to resolve this or maybe a better approach? Is it maybe easier to perform all API operations in this case within the process function of the class, but then how do i get access to the incoming flowfile attributes within the process function (requires a session or a flowfile object) ? Any help much appreciated!
Created 03-13-2018 12:04 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Solved, much simpler than i thought. You just need to pass the flowfile to the initialization function of the CallBack class.
Created 03-13-2018 11:09 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Seems i could do all API operations within the process function of the CallBack class, but in this case i need access to the flowfile attributes besides the content within the class scope. Tried to pass a flowfile reference to the class definition, but failed. Any ideas?
Created 03-13-2018 12:04 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Solved, much simpler than i thought. You just need to pass the flowfile to the initialization function of the CallBack class.
Created on 08-29-2023 08:39 AM - edited 08-29-2023 08:39 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Can you help me with a similar issue?
My code:
Created 08-21-2018 01:53 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hi Balalaika, I dont understand the answer you have posted. Can you please explain a bit more? Also from where does the obj comes in the OutputWrite class?
Created 08-21-2018 11:31 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
If you have not already, please review this 3 part series on ExecuteScript. It was very helpful for me when I was working with my custom script and ExecuteScript Processor.
https://community.hortonworks.com/content/kbentry/75032/executescript-cookbook-part-1.html
Created 08-23-2018 04:42 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
