Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Update an attribute within another attribute in Apache Nifi

avatar
New Contributor

I have a workflow which loads JSON templates from SQL which contain attribute placeholders:

{
    "agent": "${hostname(true)}",
    "startDate": "${startDate}",
    "endDate": "${endDate}",
    "data": "${data}"
}

These templates contain placeholder attributes which I'd like to replace with the value of real attributes in the Flowfile e.g. ${endDate}

109884-1563373661265.png

The SQL results are converted to JSON using ConvertAvroToJSON and then converted to an attribute named: analysisMethodArgs using EvaluateJsonPath.

I've tried ReplaceText to replace the Flowfile content with the analysisMethodArgs attribute, but it still results in the placeholders in the text and not the actual attribute values.

Is this possible?

1 ACCEPTED SOLUTION

avatar
New Contributor

A ExecuteScript for those who also need to do this and get valid JSON back:

import re
import sys
import copy
import json
import traceback
from java.nio.charset import StandardCharsets
from org.apache.commons.io import IOUtils
from org.apache.nifi.processor.io import StreamCallback
from org.python.core import PyFile
from org.python.core.util import FileUtil, StringUtil

def remove_controls(text):

    out = re.sub('\n', '', text)
    out = re.sub('\r', '', out)
    return out

class TransformCallback(StreamCallback):
    def __init__(self, flowFile):
        self.flowFile = flowFile

    def process(self, inputStream, outputStream):
        try:
            attrs = self.flowFile.getAttributes()
            pf = FileUtil().wrap(inputStream)
            output = []
            for line in pf.readlines():

                for atr in dict(attrs).keys():

                    try:
                        sub = attrs[atr].encode('ascii', 'ignore')
                        sub = remove_controls(sub)
                    except:
                        sub = attrs[atr]
                    
                    line = re.sub('\${' + atr + '}', sub, line)

                output.append(line.rstrip("\n"))
            try:
                json.dumps(output)
            except:
                raise Exception("COULD NOT POST INVALID JSON")

            outputStream.write('\n'.join(output))
        except:
            traceback.print_exc(file=sys.stdout)
            raise

flowFiles = session.get(10)
for flowFile in flowFiles:
    if flowFile is None:
        continue
    flowFile = session.write(flowFile, TransformCallback(flowFile))
    session.transfer(flowFile, REL_SUCCESS)

 

View solution in original post

1 REPLY 1

avatar
New Contributor

A ExecuteScript for those who also need to do this and get valid JSON back:

import re
import sys
import copy
import json
import traceback
from java.nio.charset import StandardCharsets
from org.apache.commons.io import IOUtils
from org.apache.nifi.processor.io import StreamCallback
from org.python.core import PyFile
from org.python.core.util import FileUtil, StringUtil

def remove_controls(text):

    out = re.sub('\n', '', text)
    out = re.sub('\r', '', out)
    return out

class TransformCallback(StreamCallback):
    def __init__(self, flowFile):
        self.flowFile = flowFile

    def process(self, inputStream, outputStream):
        try:
            attrs = self.flowFile.getAttributes()
            pf = FileUtil().wrap(inputStream)
            output = []
            for line in pf.readlines():

                for atr in dict(attrs).keys():

                    try:
                        sub = attrs[atr].encode('ascii', 'ignore')
                        sub = remove_controls(sub)
                    except:
                        sub = attrs[atr]
                    
                    line = re.sub('\${' + atr + '}', sub, line)

                output.append(line.rstrip("\n"))
            try:
                json.dumps(output)
            except:
                raise Exception("COULD NOT POST INVALID JSON")

            outputStream.write('\n'.join(output))
        except:
            traceback.print_exc(file=sys.stdout)
            raise

flowFiles = session.get(10)
for flowFile in flowFiles:
    if flowFile is None:
        continue
    flowFile = session.write(flowFile, TransformCallback(flowFile))
    session.transfer(flowFile, REL_SUCCESS)