Created 08-30-2022 02:09 AM
Hi,
Input is getting from jolt transforms and data is something like :
{
"dv_sys_id" : [ "23ec0223138d73006877", "983f853edb2c77007166f", "6a3d0a23138d73006877b3", "805d5339db8dd050f66" ]
}
output should go in invokehttp processor somthing like:
dv_sys_id: 23ec0223138d73006877, 983f853edb2c77007166f, 6a3d0a23138d73006877b3, 805d5339db8dd050f66
remote url : https://<url>${dv_sys_id}
How can i achieve this requirement, Pls help me out.
Created 08-30-2022 05:24 AM
One way to do this is to use the ExecuteScript processor with a script like the one below. This script with set the attribute dv_sys_id with the content that you want and will also add that to the content of the flowfile.
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
import json
class PyStreamCallback(StreamCallback):
def __init__(self):
self.dv_sys_id = None
def process(self, inputStream, outputStream):
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
obj = json.loads(text)
self.dv_sys_id = 'dv_sys_id=%s' % (','.join(obj['dv_sys_id'],))
outputStream.write(bytearray(self.dv_sys_id.encode('utf-8','ignore')))
flow_file = session.get()
if flow_file != None:
callback = PyStreamCallback()
flow_file = session.write(flow_file, callback)
flow_file = session.putAttribute(flow_file, 'dv_sys_id', callback.dv_sys_id)
session.transfer(flow_file, REL_SUCCESS)
Cheers,
André
Created 08-30-2022 05:24 AM
One way to do this is to use the ExecuteScript processor with a script like the one below. This script with set the attribute dv_sys_id with the content that you want and will also add that to the content of the flowfile.
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
import json
class PyStreamCallback(StreamCallback):
def __init__(self):
self.dv_sys_id = None
def process(self, inputStream, outputStream):
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
obj = json.loads(text)
self.dv_sys_id = 'dv_sys_id=%s' % (','.join(obj['dv_sys_id'],))
outputStream.write(bytearray(self.dv_sys_id.encode('utf-8','ignore')))
flow_file = session.get()
if flow_file != None:
callback = PyStreamCallback()
flow_file = session.write(flow_file, callback)
flow_file = session.putAttribute(flow_file, 'dv_sys_id', callback.dv_sys_id)
session.transfer(flow_file, REL_SUCCESS)
Cheers,
André
Created 08-31-2022 04:03 AM
Thanks for your response Andre
Created 02-07-2024 03:29 AM
Can someone help me out with a way to dynamically add attributes to a processor in nifi.
@sathish3389 @araujo
Created 02-07-2024 11:34 AM
@SS-dev Welcome to the Cloudera Community!
As this is an older post, you would have a better chance of receiving a resolution by starting a new thread. This will also be an opportunity to provide details specific to your environment that could aid others in assisting you with a more accurate answer to your question. You can link this thread as a reference in your new post. Thanks.
Regards,
Diana Torres,