Created on 07-17-2019 02:35 PM - edited 08-17-2019 04:46 PM
I have a workflow which loads JSON templates from SQL which contain attribute placeholders:
{ "agent": "${hostname(true)}", "startDate": "${startDate}", "endDate": "${endDate}", "data": "${data}" }
These templates contain placeholder attributes which I'd like to replace with the value of real attributes in the Flowfile e.g. ${endDate}
The SQL results are converted to JSON using ConvertAvroToJSON and then converted to an attribute named: analysisMethodArgs using EvaluateJsonPath.
I've tried ReplaceText to replace the Flowfile content with the analysisMethodArgs attribute, but it still results in the placeholders in the text and not the actual attribute values.
Is this possible?
Created 09-05-2019 05:55 AM
A ExecuteScript for those who also need to do this and get valid JSON back:
import re
import sys
import copy
import json
import traceback
from java.nio.charset import StandardCharsets
from org.apache.commons.io import IOUtils
from org.apache.nifi.processor.io import StreamCallback
from org.python.core import PyFile
from org.python.core.util import FileUtil, StringUtil
def remove_controls(text):
out = re.sub('\n', '', text)
out = re.sub('\r', '', out)
return out
class TransformCallback(StreamCallback):
def __init__(self, flowFile):
self.flowFile = flowFile
def process(self, inputStream, outputStream):
try:
attrs = self.flowFile.getAttributes()
pf = FileUtil().wrap(inputStream)
output = []
for line in pf.readlines():
for atr in dict(attrs).keys():
try:
sub = attrs[atr].encode('ascii', 'ignore')
sub = remove_controls(sub)
except:
sub = attrs[atr]
line = re.sub('\${' + atr + '}', sub, line)
output.append(line.rstrip("\n"))
try:
json.dumps(output)
except:
raise Exception("COULD NOT POST INVALID JSON")
outputStream.write('\n'.join(output))
except:
traceback.print_exc(file=sys.stdout)
raise
flowFiles = session.get(10)
for flowFile in flowFiles:
if flowFile is None:
continue
flowFile = session.write(flowFile, TransformCallback(flowFile))
session.transfer(flowFile, REL_SUCCESS)
Created 09-05-2019 05:55 AM
A ExecuteScript for those who also need to do this and get valid JSON back:
import re
import sys
import copy
import json
import traceback
from java.nio.charset import StandardCharsets
from org.apache.commons.io import IOUtils
from org.apache.nifi.processor.io import StreamCallback
from org.python.core import PyFile
from org.python.core.util import FileUtil, StringUtil
def remove_controls(text):
out = re.sub('\n', '', text)
out = re.sub('\r', '', out)
return out
class TransformCallback(StreamCallback):
def __init__(self, flowFile):
self.flowFile = flowFile
def process(self, inputStream, outputStream):
try:
attrs = self.flowFile.getAttributes()
pf = FileUtil().wrap(inputStream)
output = []
for line in pf.readlines():
for atr in dict(attrs).keys():
try:
sub = attrs[atr].encode('ascii', 'ignore')
sub = remove_controls(sub)
except:
sub = attrs[atr]
line = re.sub('\${' + atr + '}', sub, line)
output.append(line.rstrip("\n"))
try:
json.dumps(output)
except:
raise Exception("COULD NOT POST INVALID JSON")
outputStream.write('\n'.join(output))
except:
traceback.print_exc(file=sys.stdout)
raise
flowFiles = session.get(10)
for flowFile in flowFiles:
if flowFile is None:
continue
flowFile = session.write(flowFile, TransformCallback(flowFile))
session.transfer(flowFile, REL_SUCCESS)