Support Questions

Find answers, ask questions, and share your expertise

Who agreed with this solution

avatar
Master Guru

For this kind of custom logic, it is probably possible to use some combination of standard processors, but that might result in an overcomplicated and brittle flow. An alternative is to use a scripting processor such as ExecuteScript or InvokeScriptedProcessor, although that requires knowledge of a scripting language such as Groovy, Javascript, Jython, JRuby, Clojure, or Lua, and also the NiFi Java API. Here is an example Groovy script for ExecuteScript that takes your original input and generates the specified flow files from it:

import java.nio.charset.StandardCharsets
def flowFile = session.get()
if(!flowFile) return
def flowFiles = [] as List<FlowFile>
def inputStream = session.read(flowFile)
inputStream.eachLine { line ->
   s = line.tokenize('|')
   def prefix = s[0..2]
   def numRecords = Integer.parseInt(s[3])
   def leftoverFieldsIndex = 4 + numRecords
   (4..leftoverFieldsIndex-1).each { i ->
      def newFlowFile = session.create(flowFile)
      newFlowFile = session.write(newFlowFile, { outputStream -> 
         outputStream.write( (prefix + s[i] + s[leftoverFieldsIndex..-1]).join('|').getBytes(StandardCharsets.UTF_8) )
      } as OutputStreamCallback)
      flowFiles << newFlowFile
   }
}
session.transfer(flowFiles, REL_SUCCESS)
session.remove(flowFile)

This script extracts the number of fields to follow, then for each of those fields, it creates a flow file containing the "prefix" fields, the current field value, and the "postfix" fields as you described above.

View solution in original post

Who agreed with this solution