Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Batch processing semi-structred JSON

avatar
Explorer

Dear Experts,
We want to learn how to perform batch processing on semi-structured JSON flow files in NiFi. We know that we can use a Jolt Transform Record, an Update Record processor, and a Query Record processor, for which we need to set reader and writer controller services. However, we are unsure how to set up reader and writer controllers for semi-structured data schemas.
In the text below, we have a multi-line JSON flow file in which the "code" field is static and the "other" field is an unstructured nested JSON. We want to query the "code" field in batch mode and include the "other" field information in the resulting flow files for processing in subsequent processors. How should we set the reader and writer controllers to achieve this?
Regards,

{"code": "6", "other": {"feild1": "data1"}}
{"code": "7", "other": {"feild2": "data2"}}
{"code": "8", "other": {"feild3": "data3"}}

4 REPLIES 4

avatar
Super Guru

Hi @Arash ,

 

Not sure if there is a reader\writer that can work semi-structured data. You can develop your custom reader\writer but that will be an effort. Since you are getting your input as multiple json records lines you can either use SplitText processor to split each json record into its own flowfile and then process each record independently, or convert the input into Json array using two ReplaceText processors ( see screenshot below), then use QueryRecord & UpdateRecord with JsonTreeReader\Writer.

 

First ReplaceText: replace line break with comma

SAMSAL_0-1683985748905.png

 

2ed ReplaceText: Surround the entire text with []

 

SAMSAL_1-1683985817045.png

Hope that helps.

Thanks

 

avatar
Explorer

Hi @SAMSAL ,

We have high TPS so our constraint is using batch processing. Therefore, we do not want to split records line-by-line and we have already split flow files to 1000 records. Therefore, we need a solution for batch processing in NIFI.

avatar
Explorer

Dear @SAMSAL ,

Because we have semi-structured JSON data in the "other" field, we do not have a fixed schema to set in the JsonTreeReader. We want a solution for batch processing of the flowfiles with that assumption.

avatar
Super Collaborator

This could possibly be achieved via a InvokeScriptedProcessor but I would need to know the source and what the expected output would be. For example, taking what you posted and you want to filter on code=6 and code=8 and only have the values of "other" as individual FlowFiles, then something like this Groovy based code could achive that

 

import groovy.json.JsonOutput
import groovy.json.JsonSlurper
import java.nio.charset.StandardCharsets
import org.apache.commons.io.IOUtils

class GroovyProcessor implements Processor {
    PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
        .name("BATCH_SIZE")
        .displayName("Batch Size")
        .description("The number of incoming FlowFiles to process in a single execution of this processor.")
        .required(true)
        .defaultValue("1000")
        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
        .build()

    PropertyDescriptor FILTER_CODES = new PropertyDescriptor.Builder()
        .name("FILTER_CODES")
        .displayName("Filter Codes")
        .description("Codes to Filter On")
        .required(true)
        .defaultValue("6,8")
        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
        .build()

    Relationship REL_SUCCESS = new Relationship.Builder()
        .name("success")
        .description('FlowFiles that were successfully processed are routed here')
        .build()

    Relationship REL_FAILURE = new Relationship.Builder()
        .name("failure")
        .description('FlowFiles that were not successfully processed are routed here')
        .build()
    
    ComponentLog log 
    
    void initialize(ProcessorInitializationContext context) { log = context.logger }
    Set<Relationship> getRelationships() { return [REL_FAILURE, REL_SUCCESS] as Set }
    Collection<ValidationResult> validate(ValidationContext context) { null }
    PropertyDescriptor getPropertyDescriptor(String name) { null }
    void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }
    List<PropertyDescriptor> getPropertyDescriptors() { Collections.unmodifiableList([BATCH_SIZE, FILTER_CODES]) as List<PropertyDescriptor> }
    String getIdentifier() { null }

    JsonSlurper jsonSlurper = new JsonSlurper()
    JsonOutput jsonOutput = new JsonOutput()
    
    void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
        ProcessSession session = sessionFactory.createSession()
        try {
            List<FlowFile> flowFiles = session.get(context.getProperty(BATCH_SIZE).asInteger())
            if (!flowFiles) return

            String filterCodesString = context.getProperty(FILTER_CODES).getValue()
            List<Integer> filterCodes = filterCodesString.split(",").findAll { it.trim().matches("\\d+") }.collect { it as Integer }
            flowFiles.each { flowFile ->
                Map customAttributes = [ "mime.type": "application/json" ]
                session.read(flowFile, { inputStream -> 
                    inputStream.eachLine { line ->
                        if (line?.trim()) {
                            Map dataMap = jsonSlurper.parseText(line)
                            if (filterCodes.contains(dataMap.code.toInteger())) {
                                FlowFile newFlowFile = session.create()
                                newFlowFile = session.write(newFlowFile, 
                                    {
                                        outputStream -> outputStream.write(jsonOutput.toJson(dataMap.other).getBytes(StandardCharsets.UTF_8))
                                    } as OutputStreamCallback)
                                newFlowFile = session.putAllAttributes(newFlowFile, customAttributes)
                                session.transfer(newFlowFile, REL_SUCCESS)
                            }
                        }
                    }
                } as InputStreamCallback)
                session.remove(flowFile)                
            }
            session.commit()
        } catch (final Throwable t) {
            log.error('{} failed to process due to {}; rolling back session', [this, t] as Object[])
            session.rollback(true)
            throw t
        }
    }
}

processor = new GroovyProcessor()