Support Questions

Find answers, ask questions, and share your expertise

How to split particular data within json using splitjson processor in Apache Nifi?

avatar
New Contributor

I have this data and I need to split VisitList's content into different flowfile one by one.

What should I write in JsonPath Expression so that data got split, I tried $.* but then it is considering this given data into one file only while I want to split on the basis of VisitList.

Data

[
  {
    "employer": "98765",
    "loc_id": "312",
    "topId": "Management",
    "VisitList": [
      {
        "S1": "HR",
        "S2": "Accountant"
      },
      {
        "S1": "Manager",
        "S2": "Sr. Manager"
      }
    ]
  }
]

 I want to split Data into files like this 1st flowfile

[
  {
    "employer": "98765",
    "loc_id": "312",
    "topId": "Management",
    "VisitList": [
      {
        "S1": "HR",
        "S2": "Accountant"
      }
    ]
  }
]

 

2nd flowfile

[
  {
    "employer": "98765",
    "loc_id": "312",
    "topId": "Management",
    "VisitList": [
      {
        "S1": "Manager",
        "S2": "Sr. Manager"
      }
    ]
  }
]

 I  tried putting $.* in JsonPath Expression but it didnt work the way I want.

7 REPLIES 7

avatar
Community Manager

@Dracile Welcome to the Cloudera Community!

To help you get the best possible solution, I have tagged our NiFi experts @steven-matison ans @cotopaul  who may be able to assist you further.

Please keep us updated on your post, and we hope you find a satisfactory solution to your query.


Regards,

Diana Torres,
Community Moderator


Was your question answered? Make sure to mark the answer as the accepted solution.
If you find a reply useful, say thanks by clicking on the thumbs up button.
Learn more about the Cloudera Community:

avatar

@Dracile,

 

I do not think that SplitJson is the correct processor for you. What you are trying to achieve might be possible using some JOLT transformations. Unfortunately, I am not near a computer to test a correct transformation but I know that @SAMSAL has plenty of experience in using jolts and he might be able to further assist you. 

avatar
New Contributor

But I need to send file one by one and I have split the Content of VisitList only (only two rows S1 and S2), which has further splitted into 2 files using SplitJson but how can I process those two files one by one? 

 

avatar

@Dracile If you are looking to iterate through the results inside of your upstream json object,  you need QueryRecord with a Json Reader and Writer.  This allows you to provide the upstream schema (reader), downstream schema (writer) and a query against the flowfile.     This will unfortunately lose the original object values 

 

You can find an example here:


https://github.com/cldr-steven-matison/NiFi-Templates/blob/main/QueryRecord_Sample.json

 

 

You will need to modify the json object in GenerateFlowFile, then adjust the Reader/Writer, and the results query into $.VisitList[] array.   Once you have this lil mini test working, take the logic to your final flow.  

avatar
Community Manager

@Dracile Has the reply helped resolve your issue? If so, please mark the appropriate reply as the solution, as it will make it easier for others to find the answer in the future.  Thanks.


Regards,

Diana Torres,
Community Moderator


Was your question answered? Make sure to mark the answer as the accepted solution.
If you find a reply useful, say thanks by clicking on the thumbs up button.
Learn more about the Cloudera Community:

avatar
Expert Contributor

@Dracile - Instead I recommend using ForkRecord Processor. 

 

Like @steven-matison mentioned, create a Record Reader and Record Writer and add another property with the record path /VisitList, Mode - Split, Include Parent Fields to True.

 

drewski7_1-1686750876805.png

 

 

This will result in the next flowfile looking like this - 

[ {
  "employer" : "98765",
  "loc_id" : "312",
  "topId" : "Management",
  "VisitList" : [ {
    "S1" : "HR",
    "S2" : "Accountant"
  } ]
}, {
  "employer" : "98765",
  "loc_id" : "312",
  "topId" : "Management",
  "VisitList" : [ {
    "S1" : "Manager",
    "S2" : "Sr. Manager"
  } ]
} ]

 

Then you could split on $ using a SplitJson processor or even better continue using more record-oriented processors for better  performance 🙂 

avatar
Super Collaborator

I would do this in a single step with a InvokeScriptedProcessor and the following Groovy code

import groovy.json.JsonOutput
import groovy.json.JsonSlurper
import java.nio.charset.StandardCharsets
import org.apache.commons.io.IOUtils

class GroovyProcessor implements Processor {
    PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
        .name("BATCH_SIZE")
        .displayName("Batch Size")
        .description("The number of incoming FlowFiles to process in a single execution of this processor.")
        .required(true)
        .defaultValue("1000")
        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
        .build()

    Relationship REL_SUCCESS = new Relationship.Builder()
        .name("success")
        .description('FlowFiles that were successfully processed are routed here')
        .build()

    Relationship REL_FAILURE = new Relationship.Builder()
        .name("failure")
        .description('FlowFiles that were not successfully processed are routed here')
        .build()
    
    ComponentLog log 
    
    void initialize(ProcessorInitializationContext context) { log = context.logger }
    Set<Relationship> getRelationships() { return [REL_FAILURE, REL_SUCCESS] as Set }
    Collection<ValidationResult> validate(ValidationContext context) { null }
    PropertyDescriptor getPropertyDescriptor(String name) { null }
    void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }
    List<PropertyDescriptor> getPropertyDescriptors() { Collections.unmodifiableList([BATCH_SIZE]) as List<PropertyDescriptor> }
    String getIdentifier() { null }

    JsonSlurper jsonSlurper = new JsonSlurper()
    JsonOutput jsonOutput = new JsonOutput()
    
    void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
        ProcessSession session = sessionFactory.createSession()
        try {
            List<FlowFile> flowFiles = session.get(context.getProperty(BATCH_SIZE).asInteger())
            if (!flowFiles) return

            flowFiles.each { flowFile ->
                Map customAttributes = [ "mime.type": "application/json" ]
                List data = null
                session.read(flowFile, { 
                    inputStream -> data = jsonSlurper.parseText(IOUtils.toString(inputStream, StandardCharsets.UTF_8)) 
                } as InputStreamCallback)
                data.each { entry -> 
                    entry.VisitList.each { visit ->
                        Map newData = [:]
                        newData.put("employer", entry.employer)
                        newData.put("loc_id", entry.loc_id)
                        newData.put("topId", entry.topId)
                        newData.put("VisitList", [visit])
                        FlowFile newFlowFile = session.create()
                        newFlowFile = session.write(newFlowFile, { outputStream -> outputStream.write(jsonOutput.toJson([newData]).getBytes(StandardCharsets.UTF_8)) } as OutputStreamCallback)
                        newFlowFile = session.putAllAttributes(newFlowFile, customAttributes)
                        session.transfer(newFlowFile, REL_SUCCESS)
                    }
                }
                session.remove(flowFile)
            }
            session.commit()
        } catch (final Throwable t) {
            log.error('{} failed to process due to {}; rolling back session', [this, t] as Object[])
            session.rollback(true)
            throw t
        }
    }
}

processor = new GroovyProcessor()