Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Split one Nifi flow file into Multiple flow file based on attribute

avatar
Rising Star

Hi I have a flow file like:

server|list|number|3|abc|xyz|pqr|2015-06-06 13:00:00 , here records are separated by pipe character.

In the above record, we see there a number 2 and it is followed by abc and xyz.

My requirement is, I want to split the above flow file into files based on the number, my output should look like below:

server|list|number|abc|2015-06-06 13:00:00

server|list|number|xyz|2015-06-06 13:00:00

server|list|number|pqr|2015-06-06 13:00:00

I have come to a stage wherein I have converted above flow file in JSON and split the json file and I have captured abc|xyz|pqr in one attribute, I request help on how I can split them further into Individual records in Nifi so that I can insert them in HBase.

1 ACCEPTED SOLUTION

avatar
Master Guru

For this kind of custom logic, it is probably possible to use some combination of standard processors, but that might result in an overcomplicated and brittle flow. An alternative is to use a scripting processor such as ExecuteScript or InvokeScriptedProcessor, although that requires knowledge of a scripting language such as Groovy, Javascript, Jython, JRuby, Clojure, or Lua, and also the NiFi Java API. Here is an example Groovy script for ExecuteScript that takes your original input and generates the specified flow files from it:

import java.nio.charset.StandardCharsets
def flowFile = session.get()
if(!flowFile) return
def flowFiles = [] as List<FlowFile>
def inputStream = session.read(flowFile)
inputStream.eachLine { line ->
   s = line.tokenize('|')
   def prefix = s[0..2]
   def numRecords = Integer.parseInt(s[3])
   def leftoverFieldsIndex = 4 + numRecords
   (4..leftoverFieldsIndex-1).each { i ->
      def newFlowFile = session.create(flowFile)
      newFlowFile = session.write(newFlowFile, { outputStream -> 
         outputStream.write( (prefix + s[i] + s[leftoverFieldsIndex..-1]).join('|').getBytes(StandardCharsets.UTF_8) )
      } as OutputStreamCallback)
      flowFiles << newFlowFile
   }
}
session.transfer(flowFiles, REL_SUCCESS)
session.remove(flowFile)

This script extracts the number of fields to follow, then for each of those fields, it creates a flow file containing the "prefix" fields, the current field value, and the "postfix" fields as you described above.

View solution in original post

3 REPLIES 3

avatar
Master Guru

For this kind of custom logic, it is probably possible to use some combination of standard processors, but that might result in an overcomplicated and brittle flow. An alternative is to use a scripting processor such as ExecuteScript or InvokeScriptedProcessor, although that requires knowledge of a scripting language such as Groovy, Javascript, Jython, JRuby, Clojure, or Lua, and also the NiFi Java API. Here is an example Groovy script for ExecuteScript that takes your original input and generates the specified flow files from it:

import java.nio.charset.StandardCharsets
def flowFile = session.get()
if(!flowFile) return
def flowFiles = [] as List<FlowFile>
def inputStream = session.read(flowFile)
inputStream.eachLine { line ->
   s = line.tokenize('|')
   def prefix = s[0..2]
   def numRecords = Integer.parseInt(s[3])
   def leftoverFieldsIndex = 4 + numRecords
   (4..leftoverFieldsIndex-1).each { i ->
      def newFlowFile = session.create(flowFile)
      newFlowFile = session.write(newFlowFile, { outputStream -> 
         outputStream.write( (prefix + s[i] + s[leftoverFieldsIndex..-1]).join('|').getBytes(StandardCharsets.UTF_8) )
      } as OutputStreamCallback)
      flowFiles << newFlowFile
   }
}
session.transfer(flowFiles, REL_SUCCESS)
session.remove(flowFile)

This script extracts the number of fields to follow, then for each of those fields, it creates a flow file containing the "prefix" fields, the current field value, and the "postfix" fields as you described above.

avatar
New Contributor

Hi @Matt Burgess

I am facing a similar problem. Your help would be highly appreciated as my work is on a hold due to this issue.

My data(a sample syslog) looks like below.

Mar 29 2004 09:55:03: %PIX-6-302006: Teardown UDP connection for faddr 194.224.52.6/3645b4

Mar 29 2004 09:55:03: %PIX-6-302006: Teardown UDP connection for faddr 194.224.52.4/4454889

Mar 29 2004 09:54:26: %PIX-4-106023: Deny icmp src outside:Some-Cisco dst inside:10.0.0.187

I want the output to be divided into three fields:

Field1: Mar 29 2004 09:55:03 (This is of fixed length)

Field2: PIX-6-302006 (This is of fixed length)

Field3: Teardown UDP connection for faddr 194.224.52.6/3645b4 (The remaining data and is of variable length )

I need a regular expression(for ExtractText processor) which sub string a line based on length ,start position and end position (i.e Simialr to Java SubString method.).

The reason I am asking for a index and length based substring is because my syslog may vary for each machine.But still I can use the a similar regular expression to bring out extracted fields.

I am using ListFile->SplitLine->SplitText->PutFile as my processor flow. Please suggest for a better flow of processor list.

Once again, thanking you for the upcoming help.

Sravanthi

avatar
Master Guru

Do you mind putting this into a new HCC question? Although the problems may be somewhat similar, you will likely get answers that don't apply to the original question here, so to avoid confusion, I think a new question is the right way to go 🙂 Thanks in advance!