Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Nifi : QueryDatabaseProcessor - Add new fields to Avro Files

avatar
Contributor

Have used QueryDatabase processor to read data from sql server DB.

It has read a avro files ,need to add static fields to the avro record

for ex..

=============================

Avro Record = id,name

Value= 1,ABC

=============================

need to add type to the record

========================================

New Avro Record = id,name,type

Value= 1,ABC,IN

========================================

Is there a way to manipulate avro files .

1 ACCEPTED SOLUTION

avatar
Master Guru
@manisha jain

Yeah you can manipulate results coming out from your Query database processor itself.

if you want to add static fields to the each avro record then in your Querydatabase processor.

change Columns to return property as

*,'in' type //return all columns from table and add type column with value in for each row.

Processor Config:-

43431-query.png

as you can view above screenshot how we are configuring Columns to Return property above.

(or)

Another way doing this is

Once you get results from Query database processor then

1.Use convertavrotojson processor //we are converting avro data into json.
2.Use ReplaceText processor //to search for literal value } and do replacement value as your desired value in jsonformat
ex:- ,"type":"in"}
in this processor using replace text we are having json array of records and we are looking for } that means at end of each record and we are going to replace with ,"type":"in"} for each record so the json record structure will wont change.
3.Use convertjsontoavro processor //we are converting json data into avro data again and this data having new field that we have added to the json message.

View solution in original post

12 REPLIES 12

avatar
Master Guru
@manisha jain

Yeah you can manipulate results coming out from your Query database processor itself.

if you want to add static fields to the each avro record then in your Querydatabase processor.

change Columns to return property as

*,'in' type //return all columns from table and add type column with value in for each row.

Processor Config:-

43431-query.png

as you can view above screenshot how we are configuring Columns to Return property above.

(or)

Another way doing this is

Once you get results from Query database processor then

1.Use convertavrotojson processor //we are converting avro data into json.
2.Use ReplaceText processor //to search for literal value } and do replacement value as your desired value in jsonformat
ex:- ,"type":"in"}
in this processor using replace text we are having json array of records and we are looking for } that means at end of each record and we are going to replace with ,"type":"in"} for each record so the json record structure will wont change.
3.Use convertjsontoavro processor //we are converting json data into avro data again and this data having new field that we have added to the json message.

avatar
Contributor

Thanks @Shu

Will try this approach and revert.

avatar

Hi @manisha jain

The approach described above (Avro -> Json -> Avro) is no more required with new record based processors in NiFi. You can use UpdateRecord processor to add new attribute to your flow files whatever their format is (Avro, JSON, CSV, etc).

Just define your Avro Schema with the new attribute and use in Avro Writter. Then use UpdateAttribute to add the value as in the below example where I add city attribute with the static value Paris:

42455-screen-shot-2017-11-04-at-101328-pm.png

And the result will look like

42456-screen-shot-2017-11-04-at-101508-pm.png

Note that in my example data is in JSON format but the same approach works for Avro. Just use Avro reader/writer instead of Json.

If you are new to record based processors read these two articles:

I hope this is helpful.

avatar
Contributor

Thanks @Abdelkrim Hadjidj . Very informative .

avatar
Expert Contributor

Great Answers thanks all

avatar
Expert Contributor

@Shu

Hey I have a requirement to add a new column to the extracted contents of an oracle table. The new column name is LOAD_TMS and should contain the current timestamp. I used your approach and typed the following in the "columns to return" property - *,'${now():toNumber():format('yyyy-MM-dd HH:mm:ss')}' LOAD_TMS however i am getting errors . I also tried copying your example but even then I started getting irrelevant errors like FROM key word is missing.

Can you please assist here

Thanks

Abhi

avatar
Expert Contributor

@Shu

Meanwhile , I tried using your other option and that worked which is convert Avro to JSON and then using a replace text. That was a cool approach 🙂 . However , we are doing a streaming ingestion and want to limit the number of processors , and hence prefer option 1 which is putting that extra column at the "Query Database Table" processor layer itself.

cheers

Abhi

avatar
Master Guru
@Abhinav Joshi

Which version of NiFi are you using?
In NiFi 1.1 Columns to Return property doesn't support expression language probably that's the reason why you are having issues while adding '${now():toNumber():format('yyyy-MM-dd HH:mm:ss')}' LOAD_TMS this expression.

72460-qdt.png

If your columns to return doesn't support expression language then one of the possible work around is convert to json then add field again convert back to avro.

In New versions of i think from NiFi 1.2+ Columns to Return property supports expression language and also we are having update record processor by using this processor we can add new fields to the flowfile content without converting them.

avatar
Expert Contributor

Hey thanks . I am using 1.4 version but still I am not able to get this working. Please find the screen shot below columns-from-rdbms.png

So , I can see that the regular expression is working as in the error section , the timestamp is coming up however the issue perhaps seems to be with the syntax which is *, ............... . Isnt this supported ? Waiting for your brilliant feedback as always @Shu