<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>question Re: How to Split an array in a json file into multiple flowfiles with custom size of elements in an array in Nifi ? in Support Questions</title>
    <link>https://community.cloudera.com/t5/Support-Questions/How-to-Split-an-array-in-a-json-file-into-multiple-flowfiles/m-p/386345#M246010</link>
    <description>&lt;P&gt;I'm not sure if this can be done with out-of-the-box processors but I would do it with a Groovy based InvokeScriptedProcessor with code like this&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="java"&gt;import groovy.json.JsonOutput
import groovy.json.JsonSlurper

class GroovyProcessor implements Processor {
    PropertyDescriptor CHUNK_SIZE = new PropertyDescriptor.Builder()
        .name("CHUNK_SIZE")
        .displayName("Chunk Size")
        .description("The chunk size to break up the incoming list of values.")
        .required(true)
        .defaultValue("5")
        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
        .build()

    Relationship REL_SUCCESS = new Relationship.Builder()
        .name("success")
        .description('FlowFiles that were successfully processed are routed here')
        .build()

    Relationship REL_FAILURE = new Relationship.Builder()
        .name("failure")
        .description('FlowFiles that were not successfully processed are routed here')
        .build()

    ComponentLog log
    JsonSlurper jsonSlurper = new JsonSlurper()
    JsonOutput jsonOutput = new JsonOutput()

    void initialize(ProcessorInitializationContext context) {
        log = context.logger
    }

    Set&amp;lt;Relationship&amp;gt; getRelationships() {
        Set&amp;lt;Relationship&amp;gt; relationships = new HashSet&amp;lt;&amp;gt;()
        relationships.add(REL_FAILURE)
        relationships.add(REL_SUCCESS)
        return relationships
    }

    Collection&amp;lt;ValidationResult&amp;gt; validate(ValidationContext context) {
        
    }
    
    PropertyDescriptor getPropertyDescriptor(String name) {
        
    }
    
    void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {

    }
    
    List&amp;lt;PropertyDescriptor&amp;gt; getPropertyDescriptors() {
        List&amp;lt;PropertyDescriptor&amp;gt; descriptors = new ArrayList&amp;lt;&amp;gt;()
        descriptors.add(CHUNK_SIZE)
        return Collections.unmodifiableList(descriptors)
    }
    
    String getIdentifier() {
        
    }

    void onScheduled(ProcessContext context) throws ProcessException {
        
    }

    void onUnscheduled(ProcessContext context) throws ProcessException {
        
    }

    void onStopped(ProcessContext context) throws ProcessException {
        
    }

    void setLogger(ComponentLog logger) {
        
    }

    void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
        ProcessSession session = sessionFactory.createSession()
        try {
            List&amp;lt;FlowFile&amp;gt; flowFiles = session.get(1)
            if (!flowFiles) return
            Integer chunkSize = context.getProperty(CHUNK_SIZE).asInteger()
            flowFiles.each { FlowFile flowFile -&amp;gt;
                Map customAttributes = [ "mime.type": "application/json" ]
                Map data = null
                session.read(flowFile, { inputStream -&amp;gt; data = jsonSlurper.parse(inputStream) } as InputStreamCallback)
                List&amp;lt;List&amp;lt;String&amp;gt;&amp;gt; chunkedObjectIDs = data.objectIDs.collate(chunkSize)
                chunkedObjectIDs.each { chunk -&amp;gt;
                    data = [
                        "objectIDs": chunk
                    ]
                    FlowFile newFlowFile = session.create()
                    newFlowFile = session.write(newFlowFile, { outputStream -&amp;gt; outputStream.write(jsonOutput.toJson(data).getBytes("UTF-8")) } as OutputStreamCallback)
                    session.putAllAttributes(newFlowFile, customAttributes)
                    session.transfer(newFlowFile, REL_SUCCESS)
                }
                session.remove(flowFile)
            }
            session.commit()
        } catch (final Throwable t) {
            log.error('{} failed to process due to {}; rolling back session', [this, t] as Object[])
            session.rollback(true)
            throw t
        }
    }
}

processor = new GroovyProcessor()&lt;/LI-CODE&gt;</description>
    <pubDate>Tue, 09 Apr 2024 18:58:22 GMT</pubDate>
    <dc:creator>joseomjr</dc:creator>
    <dc:date>2024-04-09T18:58:22Z</dc:date>
    <item>
      <title>How to Split an array in a json file into multiple flowfiles with custom size of elements in an array in Nifi ?</title>
      <link>https://community.cloudera.com/t5/Support-Questions/How-to-Split-an-array-in-a-json-file-into-multiple-flowfiles/m-p/386298#M245989</link>
      <description>&lt;P class="lia-indent-padding-left-30px"&gt;&lt;SPAN&gt;&lt;SPAN class="ui-provider a b c d e f g h i j k l m n o p q r s t u v w x y z ab ac ae af ag ah ai aj ak"&gt;This is the content of my input json flowfile:&lt;BR /&gt;&lt;/SPAN&gt;&lt;/SPAN&gt;&lt;/P&gt;&lt;LI-CODE lang="java"&gt;{"objectIDs":["20607","23679","24365","31444","34130","57462"]}&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;My requirement is to divide it in such a way that each output flowfile has only 5 elements in it.&lt;BR /&gt;so if an objectIDs array has 17 element then I want 4 flowfile with the same json format with the first 3 having 5 elements and last one with the remaining 2.&lt;BR /&gt;&lt;BR /&gt;for my example content provided above, I would want two flow files&lt;BR /&gt;First One:&lt;/P&gt;&lt;LI-CODE lang="javascript"&gt;{"objectIDs":["20607","23679","24365","31444","34130","57462"]}&lt;/LI-CODE&gt;&lt;P&gt;Second one :&lt;/P&gt;&lt;LI-CODE lang="javascript"&gt;{"objectIDs":["57462"]}&lt;/LI-CODE&gt;&lt;P&gt;&lt;BR /&gt;How do I do this in Nifi ?&lt;/P&gt;</description>
      <pubDate>Mon, 08 Apr 2024 14:54:42 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Support-Questions/How-to-Split-an-array-in-a-json-file-into-multiple-flowfiles/m-p/386298#M245989</guid>
      <dc:creator>glad1</dc:creator>
      <dc:date>2024-04-08T14:54:42Z</dc:date>
    </item>
    <item>
      <title>Re: How to Split an array in a json file into multiple flowfiles with custom size of elements in an array in Nifi ?</title>
      <link>https://community.cloudera.com/t5/Support-Questions/How-to-Split-an-array-in-a-json-file-into-multiple-flowfiles/m-p/386345#M246010</link>
      <description>&lt;P&gt;I'm not sure if this can be done with out-of-the-box processors but I would do it with a Groovy based InvokeScriptedProcessor with code like this&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="java"&gt;import groovy.json.JsonOutput
import groovy.json.JsonSlurper

class GroovyProcessor implements Processor {
    PropertyDescriptor CHUNK_SIZE = new PropertyDescriptor.Builder()
        .name("CHUNK_SIZE")
        .displayName("Chunk Size")
        .description("The chunk size to break up the incoming list of values.")
        .required(true)
        .defaultValue("5")
        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
        .build()

    Relationship REL_SUCCESS = new Relationship.Builder()
        .name("success")
        .description('FlowFiles that were successfully processed are routed here')
        .build()

    Relationship REL_FAILURE = new Relationship.Builder()
        .name("failure")
        .description('FlowFiles that were not successfully processed are routed here')
        .build()

    ComponentLog log
    JsonSlurper jsonSlurper = new JsonSlurper()
    JsonOutput jsonOutput = new JsonOutput()

    void initialize(ProcessorInitializationContext context) {
        log = context.logger
    }

    Set&amp;lt;Relationship&amp;gt; getRelationships() {
        Set&amp;lt;Relationship&amp;gt; relationships = new HashSet&amp;lt;&amp;gt;()
        relationships.add(REL_FAILURE)
        relationships.add(REL_SUCCESS)
        return relationships
    }

    Collection&amp;lt;ValidationResult&amp;gt; validate(ValidationContext context) {
        
    }
    
    PropertyDescriptor getPropertyDescriptor(String name) {
        
    }
    
    void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {

    }
    
    List&amp;lt;PropertyDescriptor&amp;gt; getPropertyDescriptors() {
        List&amp;lt;PropertyDescriptor&amp;gt; descriptors = new ArrayList&amp;lt;&amp;gt;()
        descriptors.add(CHUNK_SIZE)
        return Collections.unmodifiableList(descriptors)
    }
    
    String getIdentifier() {
        
    }

    void onScheduled(ProcessContext context) throws ProcessException {
        
    }

    void onUnscheduled(ProcessContext context) throws ProcessException {
        
    }

    void onStopped(ProcessContext context) throws ProcessException {
        
    }

    void setLogger(ComponentLog logger) {
        
    }

    void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
        ProcessSession session = sessionFactory.createSession()
        try {
            List&amp;lt;FlowFile&amp;gt; flowFiles = session.get(1)
            if (!flowFiles) return
            Integer chunkSize = context.getProperty(CHUNK_SIZE).asInteger()
            flowFiles.each { FlowFile flowFile -&amp;gt;
                Map customAttributes = [ "mime.type": "application/json" ]
                Map data = null
                session.read(flowFile, { inputStream -&amp;gt; data = jsonSlurper.parse(inputStream) } as InputStreamCallback)
                List&amp;lt;List&amp;lt;String&amp;gt;&amp;gt; chunkedObjectIDs = data.objectIDs.collate(chunkSize)
                chunkedObjectIDs.each { chunk -&amp;gt;
                    data = [
                        "objectIDs": chunk
                    ]
                    FlowFile newFlowFile = session.create()
                    newFlowFile = session.write(newFlowFile, { outputStream -&amp;gt; outputStream.write(jsonOutput.toJson(data).getBytes("UTF-8")) } as OutputStreamCallback)
                    session.putAllAttributes(newFlowFile, customAttributes)
                    session.transfer(newFlowFile, REL_SUCCESS)
                }
                session.remove(flowFile)
            }
            session.commit()
        } catch (final Throwable t) {
            log.error('{} failed to process due to {}; rolling back session', [this, t] as Object[])
            session.rollback(true)
            throw t
        }
    }
}

processor = new GroovyProcessor()&lt;/LI-CODE&gt;</description>
      <pubDate>Tue, 09 Apr 2024 18:58:22 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Support-Questions/How-to-Split-an-array-in-a-json-file-into-multiple-flowfiles/m-p/386345#M246010</guid>
      <dc:creator>joseomjr</dc:creator>
      <dc:date>2024-04-09T18:58:22Z</dc:date>
    </item>
  </channel>
</rss>

