Created on 03-23-2018 04:19 AM - edited 09-16-2022 06:01 AM
Hello Guys,
I wanted to create on flow in Nifi for splitting single file into 2 file based on one column values.Is there any way to split file on to 2 file .Input file format is a .txt file.
Please provide your approach.
Regards,
Shantanu.
Created on 03-23-2018 06:08 AM - edited 08-18-2019 12:18 AM
Your question can mean two things.
I am answering both of them.
Solution 1
Follows a sample flow.
What am I doing in this flow?
1. Generate flow file
In this processor, I am generating a sample flow file with the content
This is success. This is failure.
2. Split text
In this processor, I am splitting every individual row into a flow file.
3. Route on content
This is the processor which is sending the flow files to respective relationships based on the content. Follows a snapshot of the processor config.
In this processor, I am checking if the flow file contains
And this way you can have your file split based on the content of it.
PS - If you have structured/semi-structured data,eg CSV or JSON, you can change the logic to check for the value of that specific column and then redirect the flow files accordingly.
Solution 2
Split the content based on a custom value.
In this flow, I am using the SplitContent Processor. It can take either of two following options as splitting value.
My input flow file from GenerateFlowFile processor has following content.
This is success. # This is failure
And the SplitContent processor is using # as the split value. Follows the snapshot of the processor config.
And I am able to get 2 flow files based on splits happened by my custom value passed to the processor.
Hope this helps!
Created 03-23-2018 01:14 PM
Thank you for your quick response .
Here in my scenario lettle different I have a .txt file with delimited '|' and we have some 10 columns and one of the column is STORE_ID So i wanted to split that file in to 2 part based on STORE ID value .ex: STORE ID 1 to 10 will go to different and 10 to 20 will go to different file.In your above solution how can i achieve this value range ? Please suggest me .
Regards,
Shantanu
Created 03-23-2018 07:45 PM
You can use the following flow.
SplitText -> ExtractText -> RouteOnAttribute
Follows a small description of what these processors should achieve.
1. SplitText - Same as the detailed flow above. Split each line into a new file.
2. ExtractText - Create a new attribute with the value of your StoreID column. Create a regex which will read your data, fetch the StoreID column from nth position and create StoreID attribute out of it.
3. RouteOnAttribute - Use expression language here for redirecting your flow files. For example
${StoreID:ge(1):and(${StoreID:le(10)}} //Route to processor handline store 1 to 10${StoreID:ge(11):and(${StoreID:le(20)}} //Route to processor handline store 11 to 20
And so on.
This should redirect your data per your need.
Created on 03-25-2018 06:02 PM - edited 08-18-2019 12:18 AM
You can use Query Record processor for this use case as we can run sql queries on the content of the flowfile, based on Store_id we can route the flowfile contents.
Here is what i tried..
I'm having a flow file content as follows
STORE_ID|Name 1|online 2|online 3|online 4|online 5|online 6|online 7|online 8|online 9|online 10|online 11|online 12|online 13|online 14|online 15|online 16|online 17|online 18|online 19|online 20|online
Query Record processor configs:-
As i have added new properties so that we can determine which relation does the record will routes to
Store_id 11 to 20
SELECT * FROM FLOWFILE WHERE STORE_ID >'10' and STORE_ID <'21'
(or)
SELECT * FROM FLOWFILE WHERE STORE_ID >=11 and STORE_ID <=20
Store_id 1 to 10
SELECT * FROM FLOWFILE WHERE STORE_ID >'0' and STORE_ID <'11'
(or)
SELECT * FROM FLOWFILE WHERE STORE_ID >= 1 and STORE_ID <= 10
As you are having Pipe delimited separator we need to specify the separator in CsvReader Controller service.
CsvReaderConfigs:-
as we are using Schema Name property as Schema Access Strategy we need to setup the schema.name property attribute to the flowfile by using update attribute processor.
Update Attribute configs:-
Avro Schema Registry configs:-
Validate Field Names
true
sch
{ "type": "record", "name": "nifiRecord", "fields" : [ {"name": "STORE_ID", "type": ["null", "int"]}, {"name": "Name", "type": ["null", "string"]} ] }
This avro schema registry is used by both CsvReader and CsvSetwriter Controller service.
CsvSetWriter Configs:-
In this csvsetwriter processor also we are using same avro schema registry and separator as | and change schema access strategy as Use 'Schema Name' Property.
Now once the flow file content feed to QueryRecord processor then processor runs the sql query that we have written and routes the records to the respective Relations.
Store_id 1 to 10 relation will gets below records
STORE_ID|Name 1|online 2|online 3|online 4|online 5|online 6|online 7|online 8|online 9|online 10|online
Store_id 11 to 20 relation will gets below records
STORE_ID|Name 11|online 12|online 13|online 14|online 15|online 16|online 17|online 18|online 19|online 20|online
My sample Flow:-
1.GenerateFlow File //to produce input data 2.Updateattribute //to setup schema name 3.QueryRecord //to route the content based on query.
for your reference I have attached my xml file upload and change as per your requirements
Let us know if you are facing issues/questions..!!
Created 04-18-2018 12:02 PM
If the Answer helped to resolve your issue, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of issues.