Created 06-08-2017 03:29 PM
For this kind of custom logic, it is probably possible to use some combination of standard processors, but that might result in an overcomplicated and brittle flow. An alternative is to use a scripting processor such as ExecuteScript or InvokeScriptedProcessor, although that requires knowledge of a scripting language such as Groovy, Javascript, Jython, JRuby, Clojure, or Lua, and also the NiFi Java API. Here is an example Groovy script for ExecuteScript that takes your original input and generates the specified flow files from it:
import java.nio.charset.StandardCharsets def flowFile = session.get() if(!flowFile) return def flowFiles = [] as List<FlowFile> def inputStream = session.read(flowFile) inputStream.eachLine { line -> s = line.tokenize('|') def prefix = s[0..2] def numRecords = Integer.parseInt(s[3]) def leftoverFieldsIndex = 4 + numRecords (4..leftoverFieldsIndex-1).each { i -> def newFlowFile = session.create(flowFile) newFlowFile = session.write(newFlowFile, { outputStream -> outputStream.write( (prefix + s[i] + s[leftoverFieldsIndex..-1]).join('|').getBytes(StandardCharsets.UTF_8) ) } as OutputStreamCallback) flowFiles << newFlowFile } } session.transfer(flowFiles, REL_SUCCESS) session.remove(flowFile)
This script extracts the number of fields to follow, then for each of those fields, it creates a flow file containing the "prefix" fields, the current field value, and the "postfix" fields as you described above.