Support Questions

Find answers, ask questions, and share your expertise
Celebrating as our community reaches 100,000 members! Thank you!

How to Split an array in a json file into multiple flowfiles with custom size of elements in an array in Nifi ?


This is the content of my input json flowfile:


 My requirement is to divide it in such a way that each output flowfile has only 5 elements in it.
so if an objectIDs array has 17 element then I want 4 flowfile with the same json format with the first 3 having 5 elements and last one with the remaining 2.

for my example content provided above, I would want two flow files
First One:


Second one :


How do I do this in Nifi ?


Super Collaborator

I'm not sure if this can be done with out-of-the-box processors but I would do it with a Groovy based InvokeScriptedProcessor with code like this


import groovy.json.JsonOutput
import groovy.json.JsonSlurper

class GroovyProcessor implements Processor {
    PropertyDescriptor CHUNK_SIZE = new PropertyDescriptor.Builder()
        .displayName("Chunk Size")
        .description("The chunk size to break up the incoming list of values.")

    Relationship REL_SUCCESS = new Relationship.Builder()
        .description('FlowFiles that were successfully processed are routed here')

    Relationship REL_FAILURE = new Relationship.Builder()
        .description('FlowFiles that were not successfully processed are routed here')

    ComponentLog log
    JsonSlurper jsonSlurper = new JsonSlurper()
    JsonOutput jsonOutput = new JsonOutput()

    void initialize(ProcessorInitializationContext context) {
        log = context.logger

    Set<Relationship> getRelationships() {
        Set<Relationship> relationships = new HashSet<>()
        return relationships

    Collection<ValidationResult> validate(ValidationContext context) {
    PropertyDescriptor getPropertyDescriptor(String name) {
    void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {

    List<PropertyDescriptor> getPropertyDescriptors() {
        List<PropertyDescriptor> descriptors = new ArrayList<>()
        return Collections.unmodifiableList(descriptors)
    String getIdentifier() {

    void onScheduled(ProcessContext context) throws ProcessException {

    void onUnscheduled(ProcessContext context) throws ProcessException {

    void onStopped(ProcessContext context) throws ProcessException {

    void setLogger(ComponentLog logger) {

    void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
        ProcessSession session = sessionFactory.createSession()
        try {
            List<FlowFile> flowFiles = session.get(1)
            if (!flowFiles) return
            Integer chunkSize = context.getProperty(CHUNK_SIZE).asInteger()
            flowFiles.each { FlowFile flowFile ->
                Map customAttributes = [ "mime.type": "application/json" ]
                Map data = null
      , { inputStream -> data = jsonSlurper.parse(inputStream) } as InputStreamCallback)
                List<List<String>> chunkedObjectIDs = data.objectIDs.collate(chunkSize)
                chunkedObjectIDs.each { chunk ->
                    data = [
                        "objectIDs": chunk
                    FlowFile newFlowFile = session.create()
                    newFlowFile = session.write(newFlowFile, { outputStream -> outputStream.write(jsonOutput.toJson(data).getBytes("UTF-8")) } as OutputStreamCallback)
                    session.putAllAttributes(newFlowFile, customAttributes)
                    session.transfer(newFlowFile, REL_SUCCESS)
        } catch (final Throwable t) {
            log.error('{} failed to process due to {}; rolling back session', [this, t] as Object[])
            throw t

processor = new GroovyProcessor()