Created 11-26-2022 10:10 AM
Dear Expert,
Here is my scenario for nifi development, I have generated a file with delimited '|' and I have to filter the records based on 3rd column value and separate dataset of each match
Example:
Note- No header/column name of rows.
1|1|Class|Electronic
1|1|Class|CS
1|1|Teacher|B
1|1|Teacher|B
1|1|Student|abc
1|1|Student|abc
1|4|Student|ex
1|3|Student|xyz
So desired output should be
Dataset 1
1|1|Class|Electronic
1|1|Class|CS
Dataset 2
1|1|Teacher|B
1|1|Teacher|B
Dataset 3
1|1|Student|abc
1|1|Student|abc
1|4|Student|ex
1|3|Student|xyz
later on I will create files of above data and load in HDFS location
Regards
Mars
Created 11-26-2022 05:49 PM
Hi,
Not sure if what Im suggesting is the best solution so anyone who feels like there is better solution please advise. You can solve this in two different ways:
1- if you know the schema and you dont mind adding a header so you can use the QueryRecrod processor , then you can add header first (see: https://stackoverflow.com/questions/58707242/how-to-add-headers-to-a-csv-using-apache-nifi ) and then use three different queryrecrod processor for each value to query the different datasets.
2- if you dont want to add a header, then what you can do is the following:
a. use SplitText processor to split each line
b. do a RouteOnContent for each value (student, teacher, class...) to filter the records for each dataset. For example you will have RouteOnContent which will have dynamic property called "Student" and the search value "(Student)" where the Match Requirement property is set to "...contain match". This will filter only records that has Student in them.
c. Use MergeContent processor to merge back the result set.
Let me know if you have any questions.
If you find this answers your question please accept solution.
Thanks
Created on 12-03-2022 09:35 AM - edited 12-03-2022 09:37 AM
I am dealing with Kafka dataset where there are multiple types of message data is processing (coming)
Sample data:
eventType 1-
{
"type": "record",
"name": "Dispatch_Accepted",
"namespace": "accepted.avro",
"fields": [
{
"name": "John",
"type": "string",
"doc": "Name of the user account"
},
{
"name": "email",
"type": "string",
"doc": "The email of the user logging message on the blog"
},
{
"name": "timestamp",
"type": "long",
"doc": "time in seconds"
}
],
"doc:": "A basic schema of Dispatch_Rejected"
}
EventType-2
{
"type": "record",
"name": "Dispatch_Rejected",
"namespace": "rejected.avro",
"fields": [
{
"name": "Merry",
"type": "string",
"doc": "Name of the user"
},
{
"name": "email",
"type": "string",
"doc": "The email of the user logging message on the blog"
},
{
"name": "timestamp",
"type": "long",
"doc": "time in seconds"
}
],
"doc:": "A basic schema Rejected data"
}
Schema of the data getting validated from Confluent Schema Regisry (Working Fine), I need to apply filter on Schema name (Dispatch_Rejected and Dispatch_Accepted) and crete two separate data files for each
so I am using QueryRecord Processor which below query
<Dispatch_Rejected>=Select * from FLOWFILE WHERE name='Dispatch_Rejected'
<Dispatch_Accepted>=Select * from FLOWFILE WHERE name='Dispatch_Accepted'
This is not working.. can't identify the schema name.
Controller service is working fine.
1- How I can pick the schema name from Controller service
2- Should I need to assign the value ${schema.name} in another variable <My_schema> and need to write SELECT Statement like
<Dispatch_Rejected>=Select * from FLOWFILE WHERE My_Schema.name='Dispatch_Rejected'
<Dispatch_Accepted>=Select * from FLOWFILE WHERE My_Schema.name='Dispatch_Accepted'
Summary-- I want to filter the data based on eventType, and create separate data files
Please help