Support Questions

Find answers, ask questions, and share your expertise
Welcome to the upgraded Community! Read this blog to see What’s New!

How to access the nested fields in a json using expression language in Nifi?

New Contributor

I have an input json as below:


    "orderId": "1234567890",
    "orderName": "Test1",
    "orderItems": [
        "orderItemId": "01",
        "orderItemName": "test1-OrderItem1"
        "orderItemId": "02",
        "orderItemName": "test1-OrderItem2"
    "orderId": "12235",
    "orderName": "Test2",
    "orderItems": [
        "orderItemId": "01",
        "orderItemName": "test2-OrderItem1"
    "orderId": "12236",
    "orderName": "Test3",
    "orderItems": [
        "orderItemId": "01",
        "orderItemName": "test3-OrderItem1"


I want to convert it into text form where we want the order details and then item details for each map in the list.
Here is the desired output:


1234567890 Test1
01 test1-OrderItem1
02 test1-OrderItem2
12235 Test2
01 test2-OrderItem1
12236 Test3
01 test3-OrderItem1


This is json to csv conversion, and I am using the ConvertRecord processor and below are the reader and writer configured:
1. Record Reader - JsonTreeReader
2. Record Writer - FreeFormTextRecordSetWriter

For the FreeFormTextRecordSetWriter controller service in the text property, I added the key names from the input json like this ${keyName} and got the values but in the next line, I wanted orderItemId from all the maps in the orderItems list.

How can I access the values from orderItems list?

Thanks in advance!




I would do this with a Groovy based InvokeScriptedProcessor 


Using this code:

import groovy.json.JsonOutput
import groovy.json.JsonSlurper
import java.nio.charset.StandardCharsets

class GroovyProcessor implements Processor {
    PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
        .displayName("Batch Size")
        .description("The number of incoming FlowFiles to process in a single execution of this processor.")

    Relationship REL_SUCCESS = new Relationship.Builder()
        .description('FlowFiles that were successfully processed are routed here')

    Relationship REL_FAILURE = new Relationship.Builder()
        .description('FlowFiles that were not successfully processed are routed here')
    ComponentLog log 
    void initialize(ProcessorInitializationContext context) { log = context.logger }
    Set<Relationship> getRelationships() { return [REL_FAILURE, REL_SUCCESS] as Set }
    Collection<ValidationResult> validate(ValidationContext context) { null }
    PropertyDescriptor getPropertyDescriptor(String name) { null }
    void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }
    List<PropertyDescriptor> getPropertyDescriptors() { Collections.unmodifiableList([BATCH_SIZE]) as List<PropertyDescriptor> }
    String getIdentifier() { null }

    JsonSlurper jsonSlurper = new JsonSlurper()
    JsonOutput jsonOutput = new JsonOutput()
    void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
        ProcessSession session = sessionFactory.createSession()
        try {
            List<FlowFile> flowFiles = session.get(context.getProperty(BATCH_SIZE).asInteger())
            if (!flowFiles) return

            flowFiles.each { flowFile ->
                List data = null
      , { 
                    inputStream -> data = jsonSlurper.parseText(IOUtils.toString(inputStream, StandardCharsets.UTF_8)) 
                } as InputStreamCallback)
                List outputData = []
                data.each { order ->
                    outputData.add("${order.orderId} ${order.orderName}")
                    order.orderItems.each { orderItem ->
                        outputData.add("${orderItem.orderItemId} ${orderItem.orderItemName}")
                FlowFile newFlowFile = session.create()
                newFlowFile = session.write(newFlowFile, { outputStream -> outputStream.write(outputData.join('\n').getBytes(StandardCharsets.UTF_8)) } as OutputStreamCallback)
                session.transfer(newFlowFile, REL_SUCCESS)
        } catch (final Throwable t) {
            log.error('{} failed to process due to {}; rolling back session', [this, t] as Object[])
            throw t

processor = new GroovyProcessor()

Don't let all that code scare you when the part that's doing the formatting is only these lines:


This is the generated output: