how to split the nested array in jolt transformation in apache nifi


Hi Team, pls help me out wit this jolt specification. pls help me

Note :

1.Resourcename is lastelement of resourceid ...which is new varaiable we need to add to expected out

2.Tags field needs to be copied and splited as mentioned in expected output.


"ResourceId": "/subscriptions/bb842437aa4/resourceGroups/ECHLABHENKEL/providers/Microsoft.Compute/virtualMachines/pmoapps",
"Tags": "Name\": \"PMOapplication\",\"Owner\": \"Breil sathish"
"ResourceId": "/subscriptions/bb842437aa4/resourceGroups/HCLTECHLABHENKEL/providers/Microsoft.Compute/virtualMachines/pmoapps",
"Tags": "Name\": \"PMOapplication\",\"Owner\": \"Breil sathish1"
Expected Output:

"ResourceId": "/subscriptions/bb842437aa4/resourceGroups/ECHLABHENKEL/providers/Microsoft.Compute/virtualMachines/pmoapps",
"Tags": "Name\": \"PMOapplication\",\"Owner\": \"Breil sathish",
"Resourcename": "pmoapps",
"Name": "PMOapplication",
"Owner": "Breil sathish"
"ResourceId": "/subscriptions/bb842437aa4/resourceGroups/HCTlembvs/providers/Microsoft.Compute/virtualMachines/pmoapps",
"Tags": "Name\": \"PMOapplication\",\"Owner\": \"Breil sathish1",
"Resourcename": "pmoapps",
"Name": "PMOapplication",
"Owner": "Breil sathish1"




Super Collaborator

If the input will always be like your example, I would use Groovy to make the transformation. The following Groovy based InvokeScriptedProcessor should create the output you posted.

import groovy.json.JsonOutput
import groovy.json.JsonSlurper
import java.nio.charset.StandardCharsets

class GroovyProcessor implements Processor {
    PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
        .displayName("Batch Size")
        .description("The number of incoming FlowFiles to process in a single execution of this processor.")

    Relationship REL_SUCCESS = new Relationship.Builder()
        .description('FlowFiles that were successfully processed are routed here')

    Relationship REL_FAILURE = new Relationship.Builder()
        .description('FlowFiles that were not successfully processed are routed here')

    ComponentLog log
    JsonSlurper jsonSlurper = new JsonSlurper()
    JsonOutput jsonOutput = new JsonOutput()

    void initialize(ProcessorInitializationContext context) {
        log = context.logger

    Set<Relationship> getRelationships() {
        Set<Relationship> relationships = new HashSet<>()
        return relationships

    Collection<ValidationResult> validate(ValidationContext context) {
    PropertyDescriptor getPropertyDescriptor(String name) {
    void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {

    List<PropertyDescriptor> getPropertyDescriptors() {
        List<PropertyDescriptor> descriptors = new ArrayList<>()
        return Collections.unmodifiableList(descriptors)
    String getIdentifier() {

    void onScheduled(ProcessContext context) throws ProcessException {

    void onUnscheduled(ProcessContext context) throws ProcessException {

    void onStopped(ProcessContext context) throws ProcessException {

    void setLogger(ComponentLog logger) {

    void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
        ProcessSession session = sessionFactory.createSession()
        try {
            List<FlowFile> flowFiles = session.get(context.getProperty(BATCH_SIZE).asInteger())
            if (!flowFiles) return

            flowFiles.each { FlowFile flowFile ->
                Map customAttributes = [ "mime.type": "application/json" ]
                flowFile = session.write(flowFile, { inputStream, outputStream ->
                    List<Map> data = jsonSlurper.parse(inputStream)
                    data = data.collect { Map resouce ->
                        Map tags = jsonSlurper.parseText("{\"${resouce.Tags}\"}")
                            "Name": tags.Name,
                            "Owner": tags.Owner,
                            "ResourceId": resouce.ResourceId,
                            "Resourcename": resouce.ResourceId.split("/").last(),
                            "Tags": resouce.Tags
                } as StreamCallback)
                session.putAllAttributes(flowFile, customAttributes)
                session.transfer(flowFile, REL_SUCCESS)
        } catch (final Throwable t) {
            log.error('{} failed to process due to {}; rolling back session', [this, t] as Object[])
            throw t

processor = new GroovyProcessor()