Thank you for your question.
As you mentioned, the output flow file you want get should be with two attributes.
Here you can try following:
1. Use EvaluateJsonPath processor, add two properties and configure it using JSONPath like below:
Details on JSONPath, you may refer to https://github.com/json-path/JsonPath
2. Or you may try to define some customized script to get the expected output dynamically.
e.g Using ExecuteScript with the corresponding module lib settings to indicate path of referenced jars.
- Script Engine: Groovy
- Module Directory: System Path which jars reside (or without specifying this path, you can also configure the JVM level to load it, either you set the NiFi bootstrap or place the jars under the bootstrap extended lib directory.)
import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets
import org.apache.nifi.logging.ComponentLog
import groovy.json.JsonSlurper
import com.fasterxml.jackson.databind.ObjectMapper
//Specify the module directory or configure the jvm classpath for current script to load and use such jars(jackson-annotations-2.9.0.jar,jackson-core-2.9.5.jar,jackson-databind-2.9.5.jar)
flowFile = session.get()
if(!flowFile) return
def inputStr=''
def jsonSlurper = new JsonSlurper()
try {
// Cast a closure with an inputStream parameter to InputStreamCallback
session.read(flowFile, {inputStream ->
inputStr=IOUtils.toString(inputStream, StandardCharsets.UTF_8)
} as InputStreamCallback)
def map = jsonSlurper.parseText(inputStr)
ObjectMapper mapper = new ObjectMapper()
for (entry in map) {
log.info( "JSONKey: ${entry.key} = JSONValue: ${entry.value}")
flowFile = session.putAttribute(flowFile, "${entry.key}", "${mapper.writeValueAsString(entry.value)}")
}
def outputStr=""
flowFile = session.write(flowFile, {outputStream ->
outputStream.write(outputStr.toString().getBytes('utf-8'))
} as OutputStreamCallback)
session.transfer(flowFile, REL_SUCCESS)
session.commit()
} catch(e) {
flowFile = session.putAttribute(flowFile, 'errorMsg', "${e.toString()}")
flowFile = session.write(flowFile, {outputStream ->
outputStream.write(e.toString().getBytes('utf-8'))
} as OutputStreamCallback)
session.transfer(flowFile, REL_FAILURE)
session.commit()
}
============
Additional Infos
============
You may go and visit below links:
1. For JSONPath -> https://github.com/json-path/JsonPath
2. For Apache NiFi scripts -> https://community.cloudera.com/t5/Community-Articles/ExecuteScript-Cookbook-part-1/ta-p/248922
3. For jackson-databind -> https://github.com/FasterXML/jackson-databind
Hope this helps.
Just feel free to contact me if you have any questions.
Thanks,
Oliver Gong