- Subscribe to RSS Feed
- Mark Question as New
- Mark Question as Read
- Float this Question for Current User
- Bookmark
- Subscribe
- Mute
- Printer Friendly Page
Update an attribute within another attribute in Apache Nifi
- Labels:
-
Apache NiFi
Created on ‎07-17-2019 02:35 PM - edited ‎08-17-2019 04:46 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
I have a workflow which loads JSON templates from SQL which contain attribute placeholders:
{ "agent": "${hostname(true)}", "startDate": "${startDate}", "endDate": "${endDate}", "data": "${data}" }
These templates contain placeholder attributes which I'd like to replace with the value of real attributes in the Flowfile e.g. ${endDate}
The SQL results are converted to JSON using ConvertAvroToJSON and then converted to an attribute named: analysisMethodArgs using EvaluateJsonPath.
I've tried ReplaceText to replace the Flowfile content with the analysisMethodArgs attribute, but it still results in the placeholders in the text and not the actual attribute values.
Is this possible?
Created ‎09-05-2019 05:55 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
A ExecuteScript for those who also need to do this and get valid JSON back:
import re
import sys
import copy
import json
import traceback
from java.nio.charset import StandardCharsets
from org.apache.commons.io import IOUtils
from org.apache.nifi.processor.io import StreamCallback
from org.python.core import PyFile
from org.python.core.util import FileUtil, StringUtil
def remove_controls(text):
out = re.sub('\n', '', text)
out = re.sub('\r', '', out)
return out
class TransformCallback(StreamCallback):
def __init__(self, flowFile):
self.flowFile = flowFile
def process(self, inputStream, outputStream):
try:
attrs = self.flowFile.getAttributes()
pf = FileUtil().wrap(inputStream)
output = []
for line in pf.readlines():
for atr in dict(attrs).keys():
try:
sub = attrs[atr].encode('ascii', 'ignore')
sub = remove_controls(sub)
except:
sub = attrs[atr]
line = re.sub('\${' + atr + '}', sub, line)
output.append(line.rstrip("\n"))
try:
json.dumps(output)
except:
raise Exception("COULD NOT POST INVALID JSON")
outputStream.write('\n'.join(output))
except:
traceback.print_exc(file=sys.stdout)
raise
flowFiles = session.get(10)
for flowFile in flowFiles:
if flowFile is None:
continue
flowFile = session.write(flowFile, TransformCallback(flowFile))
session.transfer(flowFile, REL_SUCCESS)
Created ‎09-05-2019 05:55 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
A ExecuteScript for those who also need to do this and get valid JSON back:
import re
import sys
import copy
import json
import traceback
from java.nio.charset import StandardCharsets
from org.apache.commons.io import IOUtils
from org.apache.nifi.processor.io import StreamCallback
from org.python.core import PyFile
from org.python.core.util import FileUtil, StringUtil
def remove_controls(text):
out = re.sub('\n', '', text)
out = re.sub('\r', '', out)
return out
class TransformCallback(StreamCallback):
def __init__(self, flowFile):
self.flowFile = flowFile
def process(self, inputStream, outputStream):
try:
attrs = self.flowFile.getAttributes()
pf = FileUtil().wrap(inputStream)
output = []
for line in pf.readlines():
for atr in dict(attrs).keys():
try:
sub = attrs[atr].encode('ascii', 'ignore')
sub = remove_controls(sub)
except:
sub = attrs[atr]
line = re.sub('\${' + atr + '}', sub, line)
output.append(line.rstrip("\n"))
try:
json.dumps(output)
except:
raise Exception("COULD NOT POST INVALID JSON")
outputStream.write('\n'.join(output))
except:
traceback.print_exc(file=sys.stdout)
raise
flowFiles = session.get(10)
for flowFile in flowFiles:
if flowFile is None:
continue
flowFile = session.write(flowFile, TransformCallback(flowFile))
session.transfer(flowFile, REL_SUCCESS)
