Support Questions

Find answers, ask questions, and share your expertise

Apache NiFi to split incoming data from a file based on condition into 3 flows

avatar
New Contributor

suppose there are 100 records coming from source file .My requirement is to split the record into 3 different flow .

flow 1: move all 100 record as is.

based on a condition i.e where column4='xyz' , the incoming data will be split into 2 more flow.i.e

flow 2: will get 75 records

flow 3 : will get 25 records.

the output of above flow will be 3 different files.

thanks,

1 REPLY 1

avatar
Master Guru

@Hemu Singh

For this use case you need to use Query Record processor and Based on the Record Reader controller services configured this processor will execute sql queries on the Flowfile Contents and The result of the SQL query then becomes the content of the output FlowFile in the format as specified in the Record Writer controller service.

Flow:-

68518-flow.png

Flow Explanation:-

1.Generate Flowfile //added some test data

2.UpdateAttribute //added schema to the flowfile

3.Filter Column QueryRecord //

3.1.need to configure/enable Record Reader/Writer controller services.

3.2.added new property that will run sql where query on the flowfile

68519-filtercolumn.png

3.3.Use the original relation to store the file as is i.e having 100 records in it.
4.QueryRecord //

4.1.add two new properties that can run row_number window function(i'm having id column in the flowfile) and get first 75 records in one relation

first 75 records
select * from (select *,Row_Number() over(order by id asc) as rn from FLOWFILE) r where r.rn <= 75

76 to 100 record

select * from 
(select *,Row_Number() over(order by id asc) as rn from FLOWFILE) r where r.rn > 75 and r.rn <= 100 

68520-queryrecord.png

Use the above two relations first75records and 76to100 record relationships for further processing.

In addition Query Record supports Limit offset..etc also so you can use either row_number/limit offset ..etc to get only the desired 75 records from the flowfile.

Please refer to this and this for QueryRecord processor configs and usage.