Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Nifi Split JSON Expression

avatar
New Contributor

I am trying to split an JSON Object using split json processor but I am unable to do this I want key in my attribute and the attribute value will be JSON array.

 

Here is my JSON.

 

{
"book": [
{
"category": "reference",
"author": "Nigel Rees",
"title": "Sayings of the Century",
"price": 8.95
}
],
"bicycle": [
{
"color": "red",
"price": 19.95
}
]
}

 

I want split each record as seprate flowfile on the basis of key. 

What is the correct Json path expression ? or if using any other way i can achieve this .

1 REPLY 1

avatar
Contributor

Thank you for your question.
As you mentioned, the output flow file you want get should be with two attributes.

OliverGong_0-1642407172625.png

Here you can try following:
1. Use EvaluateJsonPath processor, add two properties and configure it using JSONPath like below:

OliverGong_1-1642407309628.png

Details on JSONPath, you may refer to https://github.com/json-path/JsonPath

2. Or you may try to define some customized script to get the expected output dynamically.
e.g Using ExecuteScript with the corresponding module lib settings to indicate path of referenced jars.

 

- Script Engine: Groovy
Module Directory:  System Path which jars reside  (or without specifying this path, you can also configure the JVM level to load it, either you set the NiFi bootstrap or place the jars under the bootstrap extended lib directory.)

 

 

import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets
import org.apache.nifi.logging.ComponentLog
import groovy.json.JsonSlurper
import com.fasterxml.jackson.databind.ObjectMapper
//Specify the module directory or configure the jvm classpath for current script to load and use such jars(jackson-annotations-2.9.0.jar,jackson-core-2.9.5.jar,jackson-databind-2.9.5.jar)

flowFile = session.get()
if(!flowFile) return

def inputStr=''
def jsonSlurper = new JsonSlurper()

try {
            // Cast a closure with an inputStream parameter to InputStreamCallback
            session.read(flowFile, {inputStream -> 
                inputStr=IOUtils.toString(inputStream, StandardCharsets.UTF_8)
            } as InputStreamCallback)

            def map = jsonSlurper.parseText(inputStr)
	    
	    ObjectMapper mapper = new ObjectMapper()

            for (entry in map) {
                log.info( "JSONKey: ${entry.key} = JSONValue: ${entry.value}")
		
                flowFile = session.putAttribute(flowFile, "${entry.key}", "${mapper.writeValueAsString(entry.value)}")
            }
            
            def outputStr=""
            flowFile = session.write(flowFile, {outputStream ->
                outputStream.write(outputStr.toString().getBytes('utf-8'))
            } as OutputStreamCallback)
            
            session.transfer(flowFile, REL_SUCCESS)            
            session.commit()

        } catch(e) {
            flowFile = session.putAttribute(flowFile, 'errorMsg', "${e.toString()}")
            flowFile = session.write(flowFile, {outputStream ->
                outputStream.write(e.toString().getBytes('utf-8'))
            } as OutputStreamCallback)
            session.transfer(flowFile, REL_FAILURE)
            session.commit()
        }

 

 

 

 

 


============
Additional Infos
============

You may go and visit below links:
1. For JSONPath -> https://github.com/json-path/JsonPath
2. For Apache NiFi scripts -> https://community.cloudera.com/t5/Community-Articles/ExecuteScript-Cookbook-part-1/ta-p/248922

3. For jackson-databind -> https://github.com/FasterXML/jackson-databind

 


Hope this helps.
Just feel free to contact me if you have any questions.

Thanks,
Oliver Gong