Created 04-04-2017 05:37 PM
I have an incoming JSON message coming onto NiFi and one of the values is an XML clob. I can get the attribute out and parse the XML using XMLTransform processor, but how can I merge this data back to the original JSON? I tried using merge processor, but have following concerns:
1) Merge processor is not able to concatenate 2 JSON files onto 1 JSON
2) When there are multiple source messages hiting NiFi, how can NiFi handle which flowfiles to merge
Created 04-06-2017 10:25 PM
Have you tried using the JoltTransformJSON processor? You should be able create a Jolt specification to extract the information you want.
Created 04-24-2017 04:54 PM
If the JSON content is not too large to fit in memory, you could use ExecuteScript for this, Groovy has an XmlSlurper that can parse your XML clob (assuming it has been placed in an attribute via EvaluateJsonPath), and a JsonSlurper (and JsonOutput) that can read/write JSON as objects. For example, given the input:
{ "key": "k1", "clob": "<root><attribute><name>attr1</name><value>Hello</value></attribute><attribute><name>attr2</name><value>World!</value></attribute></root>" }
You could use the following Groovy script in ExecuteScript:
import org.apache.commons.io.IOUtils import java.nio.charset.* import groovy.json.* import groovy.util.* def flowFile = session.get() if (!flowFile) return try { flowFile = session.write(flowFile, { inputStream, outputStream -> def text = IOUtils.toString(inputStream, StandardCharsets.UTF_8) // Parse JSON into object def json = new JsonSlurper().parseText(text) // Parse XML from clob field into object def xml = new XmlSlurper().parseText(json.clob) // Add a field to the JSON for each "attribute" tag in the XML xml.attribute.each { a -> json[a.name.toString()] = a.value.toString() } // Remove the clob field json.remove('clob') // Write the updated JSON object as the flow file content outputStream.write(JsonOutput.prettyPrint(JsonOutput.toJson(json)).getBytes(StandardCharsets.UTF_8)) } as StreamCallback) flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename').tokenize('.')[0]+'_with_clob_fields.json') session.transfer(flowFile, REL_SUCCESS) } catch(Exception e) { log.error('Error extracting XML fields into JSON', e) session.transfer(flowFile, REL_FAILURE) }
For the given input, it generates the following output:
{ "attr1": "Hello", "attr2": "World!", "key": "k1" }
The script extracts the XML text from the "clob" field, then parses it into an object with XmlSlurper, then finds the individual "attribute" tags within, and adds each name/value pair to the original JSON object.
For instances where the clob is not too large, it might be helpful to have an "xPath()" or "xmlPath" function in NiFi Expression Language (like the jsonPath() function added in NIFI-1660). Please feel free to file a Jira case to add this feature.