Created 11-03-2017 01:06 PM
Have used QueryDatabase processor to read data from sql server DB.
It has read a avro files ,need to add static fields to the avro record
for ex..
=============================
Avro Record = id,name
Value= 1,ABC
=============================
need to add type to the record
========================================
New Avro Record = id,name,type
Value= 1,ABC,IN
========================================
Is there a way to manipulate avro files .
Created on 11-03-2017 11:18 PM - edited 08-18-2019 12:49 AM
Yeah you can manipulate results coming out from your Query database processor itself.
if you want to add static fields to the each avro record then in your Querydatabase processor.
change Columns to return property as
*,'in' type //return all columns from table and add type column with value in for each row.
Processor Config:-
as you can view above screenshot how we are configuring Columns to Return property above.
(or)
Another way doing this is
Once you get results from Query database processor then
1.Use convertavrotojson processor //we are converting avro data into json. 2.Use ReplaceText processor //to search for literal value } and do replacement value as your desired value in jsonformat ex:- ,"type":"in"} in this processor using replace text we are having json array of records and we are looking for } that means at end of each record and we are going to replace with ,"type":"in"} for each record so the json record structure will wont change. 3.Use convertjsontoavro processor //we are converting json data into avro data again and this data having new field that we have added to the json message.
Created on 11-03-2017 11:18 PM - edited 08-18-2019 12:49 AM
Yeah you can manipulate results coming out from your Query database processor itself.
if you want to add static fields to the each avro record then in your Querydatabase processor.
change Columns to return property as
*,'in' type //return all columns from table and add type column with value in for each row.
Processor Config:-
as you can view above screenshot how we are configuring Columns to Return property above.
(or)
Another way doing this is
Once you get results from Query database processor then
1.Use convertavrotojson processor //we are converting avro data into json. 2.Use ReplaceText processor //to search for literal value } and do replacement value as your desired value in jsonformat ex:- ,"type":"in"} in this processor using replace text we are having json array of records and we are looking for } that means at end of each record and we are going to replace with ,"type":"in"} for each record so the json record structure will wont change. 3.Use convertjsontoavro processor //we are converting json data into avro data again and this data having new field that we have added to the json message.
Created 11-07-2017 09:18 AM
Thanks @Shu
Created on 11-04-2017 09:19 PM - edited 08-18-2019 12:48 AM
The approach described above (Avro -> Json -> Avro) is no more required with new record based processors in NiFi. You can use UpdateRecord processor to add new attribute to your flow files whatever their format is (Avro, JSON, CSV, etc).
Just define your Avro Schema with the new attribute and use in Avro Writter. Then use UpdateAttribute to add the value as in the below example where I add city attribute with the static value Paris:
And the result will look like
Note that in my example data is in JSON format but the same approach works for Avro. Just use Avro reader/writer instead of Json.
If you are new to record based processors read these two articles:
I hope this is helpful.
Created 11-07-2017 09:20 AM
Thanks @Abdelkrim Hadjidj . Very informative .
Created 03-26-2018 03:05 AM
Great Answers thanks all
Created 04-26-2018 01:34 AM
Hey I have a requirement to add a new column to the extracted contents of an oracle table. The new column name is LOAD_TMS and should contain the current timestamp. I used your approach and typed the following in the "columns to return" property - *,'${now():toNumber():format('yyyy-MM-dd HH:mm:ss')}' LOAD_TMS however i am getting errors . I also tried copying your example but even then I started getting irrelevant errors like FROM key word is missing.
Can you please assist here
Thanks
Abhi
Created 04-26-2018 05:09 AM
Meanwhile , I tried using your other option and that worked which is convert Avro to JSON and then using a replace text. That was a cool approach 🙂 . However , we are doing a streaming ingestion and want to limit the number of processors , and hence prefer option 1 which is putting that extra column at the "Query Database Table" processor layer itself.
cheers
Abhi
Created on 04-26-2018 05:38 AM - edited 08-18-2019 12:48 AM
Which version of NiFi are you using?
In NiFi 1.1 Columns to Return property doesn't support expression language probably that's the reason why you are having issues while adding '${now():toNumber():format('yyyy-MM-dd HH:mm:ss')}' LOAD_TMS this expression.
If your columns to return doesn't support expression language then one of the possible work around is convert to json then add field again convert back to avro.
In New versions of i think from NiFi 1.2+ Columns to Return property supports expression language and also we are having update record processor by using this processor we can add new fields to the flowfile content without converting them.
Created 04-26-2018 07:25 AM
Hey thanks . I am using 1.4 version but still I am not able to get this working. Please find the screen shot below columns-from-rdbms.png
So , I can see that the regular expression is working as in the error section , the timestamp is coming up however the issue perhaps seems to be with the syntax which is *, ............... . Isnt this supported ? Waiting for your brilliant feedback as always @Shu