Created on 07-19-2024 07:26 AM - edited 07-19-2024 07:27 AM
I have a python script that takes in text input in the format like this
{"order":{"Testorder":{"operation":"create","specification":{"name":"${name}","version":"1.0.0-SNAPSHOT"},"parameters":{"controllerName":"${controllername}","parameters":{"parameter1":"${value1}","parameter2":"${value2}"}}}}}
{"name": "TestService", "controllername": "MyController_d","value1": "test","value2": "speed"}
First line is a JSON Template with place holders. Second line another JSON string which holds the value to be replaced in the first line. (whole input (first line and second line together) are not a valid JSON)
It creates an output like this
{
"order" : {
"Testorder" : {
"operation" : "create",
"specification" : {
"name" : "TestService",
"version" : "1.0.2-SNAPSHOT"
},
"parameters" : {
"controllerName" : "MyController_d",
"parameters" : {
"parameter1" : "test",
"parameter2" : "strength"
}
}
}
}
}
I have implemented this using a python script inside ExecuteScript processor
import json
import sys
import traceback
from java.nio.charset import StandardCharsets
from org.apache.commons.io import IOUtils
from org.apache.nifi.processor.io import StreamCallback
from string import Template
class PyStreamCallback(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
try:
input_text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
incoming_flow = input_text.split('\n', 1)
if len(incoming_flow) < 2:
raise ValueError("Input data does not contain both template and values.")
template_str, values_str = incoming_flow
template_str = template_str.strip()
values_str = values_str.strip()
json.loads(template_str)
parameter_obj = json.loads(values_str)
tpl = Template(template_str)
json_string = tpl.substitute(parameter_obj)
# Replace placeholders and unwanted characters
replacements = {
"u'": "'",
'u"': '"',
'"{': '{',
'}"': '}',
"'": '"',
"False": "false",
"True": "true",
'"[': "[",
']"': "]"
}
for old, new in replacements.items():
json_string = json_string.replace(old, new)
outputStream.write(bytearray(json_string.encode('utf-8')))
except Exception as e:
traceback.print_exc(file=sys.stdout)
raise e
flowFile = session.get()
if flowFile is not None:
try:
flowFile = session.write(flowFile, PyStreamCallback())
session.putAttribute(flowFile, 'mime.type', 'application/json')
session.transfer(flowFile, REL_SUCCESS)
except ValueError as ve:
session.putAttribute(flowFile, 'error_val', str({"Script.Exception": str(ve)}))
session.transfer(flowFile, REL_FAILURE)
except Exception as e:
session.putAttribute(flowFile, 'error_val', str({"Script.Exception": str(e)}))
session.transfer(flowFile, REL_FAILURE)
Though there are a lot of string manipulation, so far it is working fine.
Since python is deprecated in the ExecuteScript Processor. Was wondering if it would be possible to do this same operations using any other processor in NiFi without writing any of custom script.
we're using 1.24.0 version of NiFi.
Can the ReplaceText processor be used to achieve this or is there any other recommendation?
Created 07-22-2024 12:44 PM
You could implement this using Groovy script which is forward and backward compatible. I've taken code written in NiFi 1.X and works just fine in 2.X