- Subscribe to RSS Feed
- Mark Question as New
- Mark Question as Read
- Float this Question for Current User
- Bookmark
- Subscribe
- Mute
- Printer Friendly Page
Split one Nifi flow file into Multiple flow file based on attribute
- Labels:
-
Apache HBase
-
Apache NiFi
-
Apache Phoenix
Created ‎06-08-2017 02:03 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hi I have a flow file like:
server|list|number|3|abc|xyz|pqr|2015-06-06 13:00:00 , here records are separated by pipe character.
In the above record, we see there a number 2 and it is followed by abc and xyz.
My requirement is, I want to split the above flow file into files based on the number, my output should look like below:
server|list|number|abc|2015-06-06 13:00:00
server|list|number|xyz|2015-06-06 13:00:00
server|list|number|pqr|2015-06-06 13:00:00
I have come to a stage wherein I have converted above flow file in JSON and split the json file and I have captured abc|xyz|pqr in one attribute, I request help on how I can split them further into Individual records in Nifi so that I can insert them in HBase.
Created ‎06-08-2017 03:29 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
For this kind of custom logic, it is probably possible to use some combination of standard processors, but that might result in an overcomplicated and brittle flow. An alternative is to use a scripting processor such as ExecuteScript or InvokeScriptedProcessor, although that requires knowledge of a scripting language such as Groovy, Javascript, Jython, JRuby, Clojure, or Lua, and also the NiFi Java API. Here is an example Groovy script for ExecuteScript that takes your original input and generates the specified flow files from it:
import java.nio.charset.StandardCharsets def flowFile = session.get() if(!flowFile) return def flowFiles = [] as List<FlowFile> def inputStream = session.read(flowFile) inputStream.eachLine { line -> s = line.tokenize('|') def prefix = s[0..2] def numRecords = Integer.parseInt(s[3]) def leftoverFieldsIndex = 4 + numRecords (4..leftoverFieldsIndex-1).each { i -> def newFlowFile = session.create(flowFile) newFlowFile = session.write(newFlowFile, { outputStream -> outputStream.write( (prefix + s[i] + s[leftoverFieldsIndex..-1]).join('|').getBytes(StandardCharsets.UTF_8) ) } as OutputStreamCallback) flowFiles << newFlowFile } } session.transfer(flowFiles, REL_SUCCESS) session.remove(flowFile)
This script extracts the number of fields to follow, then for each of those fields, it creates a flow file containing the "prefix" fields, the current field value, and the "postfix" fields as you described above.
Created ‎06-08-2017 03:29 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
For this kind of custom logic, it is probably possible to use some combination of standard processors, but that might result in an overcomplicated and brittle flow. An alternative is to use a scripting processor such as ExecuteScript or InvokeScriptedProcessor, although that requires knowledge of a scripting language such as Groovy, Javascript, Jython, JRuby, Clojure, or Lua, and also the NiFi Java API. Here is an example Groovy script for ExecuteScript that takes your original input and generates the specified flow files from it:
import java.nio.charset.StandardCharsets def flowFile = session.get() if(!flowFile) return def flowFiles = [] as List<FlowFile> def inputStream = session.read(flowFile) inputStream.eachLine { line -> s = line.tokenize('|') def prefix = s[0..2] def numRecords = Integer.parseInt(s[3]) def leftoverFieldsIndex = 4 + numRecords (4..leftoverFieldsIndex-1).each { i -> def newFlowFile = session.create(flowFile) newFlowFile = session.write(newFlowFile, { outputStream -> outputStream.write( (prefix + s[i] + s[leftoverFieldsIndex..-1]).join('|').getBytes(StandardCharsets.UTF_8) ) } as OutputStreamCallback) flowFiles << newFlowFile } } session.transfer(flowFiles, REL_SUCCESS) session.remove(flowFile)
This script extracts the number of fields to follow, then for each of those fields, it creates a flow file containing the "prefix" fields, the current field value, and the "postfix" fields as you described above.
Created ‎06-13-2017 05:11 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
I am facing a similar problem. Your help would be highly appreciated as my work is on a hold due to this issue.
My data(a sample syslog) looks like below.
Mar 29 2004 09:55:03: %PIX-6-302006: Teardown UDP connection for faddr 194.224.52.6/3645b4
Mar 29 2004 09:55:03: %PIX-6-302006: Teardown UDP connection for faddr 194.224.52.4/4454889
Mar 29 2004 09:54:26: %PIX-4-106023: Deny icmp src outside:Some-Cisco dst inside:10.0.0.187
I want the output to be divided into three fields:
Field1: Mar 29 2004 09:55:03 (This is of fixed length)
Field2: PIX-6-302006 (This is of fixed length)
Field3: Teardown UDP connection for faddr 194.224.52.6/3645b4 (The remaining data and is of variable length )
I need a regular expression(for ExtractText processor) which sub string a line based on length ,start position and end position (i.e Simialr to Java SubString method.).
The reason I am asking for a index and length based substring is because my syslog may vary for each machine.But still I can use the a similar regular expression to bring out extracted fields.
I am using ListFile->SplitLine->SplitText->PutFile as my processor flow. Please suggest for a better flow of processor list.
Once again, thanking you for the upcoming help.
Sravanthi
Created ‎06-13-2017 03:40 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Do you mind putting this into a new HCC question? Although the problems may be somewhat similar, you will likely get answers that don't apply to the original question here, so to avoid confusion, I think a new question is the right way to go 🙂 Thanks in advance!
