Support Questions

Find answers, ask questions, and share your expertise

Nifi

avatar
Explorer

Hi,

Input is getting from jolt transforms and data is something like :

{
"dv_sys_id" : [ "23ec0223138d73006877", "983f853edb2c77007166f", "6a3d0a23138d73006877b3", "805d5339db8dd050f66" ]
}

output should go in invokehttp processor somthing like:

 

dv_sys_id: 23ec0223138d73006877, 983f853edb2c77007166f, 6a3d0a23138d73006877b3, 805d5339db8dd050f66

 

remote url : https://<url>${dv_sys_id}

 

How can i achieve this requirement, Pls help me out. 

 

 

 

1 ACCEPTED SOLUTION

avatar
Super Guru

@sathish3389 ,

 

One way to do this is to use the ExecuteScript processor with a script like the one below. This script with set the attribute dv_sys_id with the content that you want and will also add that to the content of the flowfile.

from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
import json

class PyStreamCallback(StreamCallback):
    def __init__(self):
        self.dv_sys_id = None

    def process(self, inputStream, outputStream):
        text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
        obj = json.loads(text)
        self.dv_sys_id = 'dv_sys_id=%s' % (','.join(obj['dv_sys_id'],))
        outputStream.write(bytearray(self.dv_sys_id.encode('utf-8','ignore')))

flow_file = session.get()
if flow_file != None:
    callback = PyStreamCallback()
    flow_file = session.write(flow_file, callback)
    flow_file = session.putAttribute(flow_file, 'dv_sys_id', callback.dv_sys_id)
    session.transfer(flow_file, REL_SUCCESS)

 

 

Cheers,

André

 

--
Was your question answered? Please take some time to click on "Accept as Solution" below this post.
If you find a reply useful, say thanks by clicking on the thumbs up button.

View solution in original post

4 REPLIES 4

avatar
Super Guru

@sathish3389 ,

 

One way to do this is to use the ExecuteScript processor with a script like the one below. This script with set the attribute dv_sys_id with the content that you want and will also add that to the content of the flowfile.

from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
import json

class PyStreamCallback(StreamCallback):
    def __init__(self):
        self.dv_sys_id = None

    def process(self, inputStream, outputStream):
        text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
        obj = json.loads(text)
        self.dv_sys_id = 'dv_sys_id=%s' % (','.join(obj['dv_sys_id'],))
        outputStream.write(bytearray(self.dv_sys_id.encode('utf-8','ignore')))

flow_file = session.get()
if flow_file != None:
    callback = PyStreamCallback()
    flow_file = session.write(flow_file, callback)
    flow_file = session.putAttribute(flow_file, 'dv_sys_id', callback.dv_sys_id)
    session.transfer(flow_file, REL_SUCCESS)

 

 

Cheers,

André

 

--
Was your question answered? Please take some time to click on "Accept as Solution" below this post.
If you find a reply useful, say thanks by clicking on the thumbs up button.

avatar
Explorer

Thanks for your response Andre

avatar
New Contributor

Can someone help me out with a way to dynamically add attributes to a processor in nifi.
@sathish3389 @araujo 

avatar
Community Manager

@SS-dev Welcome to the Cloudera Community!

As this is an older post, you would have a better chance of receiving a resolution by starting a new thread. This will also be an opportunity to provide details specific to your environment that could aid others in assisting you with a more accurate answer to your question. You can link this thread as a reference in your new post. Thanks.


Regards,

Diana Torres,
Community Moderator


Was your question answered? Make sure to mark the answer as the accepted solution.
If you find a reply useful, say thanks by clicking on the thumbs up button.
Learn more about the Cloudera Community: