Support Questions

Find answers, ask questions, and share your expertise

how to split the nested array in jolt transformation in apache nifi

avatar
Explorer

Hi Team, pls help me out wit this jolt specification. pls help me

Note :

1.Resourcename is lastelement of resourceid ...which is new varaiable we need to add to expected out

2.Tags field needs to be copied and splited as mentioned in expected output.

Input:

[
{
"ResourceId": "/subscriptions/bb842437aa4/resourceGroups/ECHLABHENKEL/providers/Microsoft.Compute/virtualMachines/pmoapps",
"Tags": "Name\": \"PMOapplication\",\"Owner\": \"Breil sathish"
},
{
"ResourceId": "/subscriptions/bb842437aa4/resourceGroups/HCLTECHLABHENKEL/providers/Microsoft.Compute/virtualMachines/pmoapps",
"Tags": "Name\": \"PMOapplication\",\"Owner\": \"Breil sathish1"
}
]
Expected Output:

[
{
"ResourceId": "/subscriptions/bb842437aa4/resourceGroups/ECHLABHENKEL/providers/Microsoft.Compute/virtualMachines/pmoapps",
"Tags": "Name\": \"PMOapplication\",\"Owner\": \"Breil sathish",
"Resourcename": "pmoapps",
"Name": "PMOapplication",
"Owner": "Breil sathish"
},
{
"ResourceId": "/subscriptions/bb842437aa4/resourceGroups/HCTlembvs/providers/Microsoft.Compute/virtualMachines/pmoapps",
"Tags": "Name\": \"PMOapplication\",\"Owner\": \"Breil sathish1",
"Resourcename": "pmoapps",
"Name": "PMOapplication",
"Owner": "Breil sathish1"
}
]

 

Thanks

1 REPLY 1

avatar
Super Collaborator

If the input will always be like your example, I would use Groovy to make the transformation. The following Groovy based InvokeScriptedProcessor should create the output you posted.

import groovy.json.JsonOutput
import groovy.json.JsonSlurper
import java.nio.charset.StandardCharsets

class GroovyProcessor implements Processor {
    PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
        .name("BATCH_SIZE")
        .displayName("Batch Size")
        .description("The number of incoming FlowFiles to process in a single execution of this processor.")
        .required(true)
        .defaultValue("100")
        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
        .build()

    Relationship REL_SUCCESS = new Relationship.Builder()
        .name("success")
        .description('FlowFiles that were successfully processed are routed here')
        .build()

    Relationship REL_FAILURE = new Relationship.Builder()
        .name("failure")
        .description('FlowFiles that were not successfully processed are routed here')
        .build()

    ComponentLog log
    JsonSlurper jsonSlurper = new JsonSlurper()
    JsonOutput jsonOutput = new JsonOutput()

    void initialize(ProcessorInitializationContext context) {
        log = context.logger
    }

    Set<Relationship> getRelationships() {
        Set<Relationship> relationships = new HashSet<>()
        relationships.add(REL_FAILURE)
        relationships.add(REL_SUCCESS)
        return relationships
    }

    Collection<ValidationResult> validate(ValidationContext context) {
        
    }
    
    PropertyDescriptor getPropertyDescriptor(String name) {
        
    }
    
    void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {

    }
    
    List<PropertyDescriptor> getPropertyDescriptors() {
        List<PropertyDescriptor> descriptors = new ArrayList<>()
        descriptors.add(BATCH_SIZE)
        return Collections.unmodifiableList(descriptors)
    }
    
    String getIdentifier() {
        
    }

    void onScheduled(ProcessContext context) throws ProcessException {
        
    }

    void onUnscheduled(ProcessContext context) throws ProcessException {
        
    }

    void onStopped(ProcessContext context) throws ProcessException {
        
    }

    void setLogger(ComponentLog logger) {
        
    }

    void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
        ProcessSession session = sessionFactory.createSession()
        try {
            List<FlowFile> flowFiles = session.get(context.getProperty(BATCH_SIZE).asInteger())
            if (!flowFiles) return

            flowFiles.each { FlowFile flowFile ->
                Map customAttributes = [ "mime.type": "application/json" ]
                flowFile = session.write(flowFile, { inputStream, outputStream ->
                    List<Map> data = jsonSlurper.parse(inputStream)
                    data = data.collect { Map resouce ->
                        Map tags = jsonSlurper.parseText("{\"${resouce.Tags}\"}")
                        [
                            "Name": tags.Name,
                            "Owner": tags.Owner,
                            "ResourceId": resouce.ResourceId,
                            "Resourcename": resouce.ResourceId.split("/").last(),
                            "Tags": resouce.Tags
                        ]
                    }
                    outputStream.write(JsonOutput.toJson(data).getBytes(StandardCharsets.UTF_8))
                } as StreamCallback)
                session.putAllAttributes(flowFile, customAttributes)
                session.transfer(flowFile, REL_SUCCESS)
            }
            session.commit()
        } catch (final Throwable t) {
            log.error('{} failed to process due to {}; rolling back session', [this, t] as Object[])
            session.rollback(true)
            throw t
        }
    }
}

processor = new GroovyProcessor()