Support Questions
Find answers, ask questions, and share your expertise

Merging Stream with Db Records (concatenate csv/json)

Expert Contributor



I have a stream of events coming in via ActiveMQ. Each event is a CSV record. it contains values for

driverID, pickup time, Drop time, Cost, passenger count etc.

I have the driver information stored in a Database for example

driver id, driver name, age, gender, address, Driver vehicle, driving license details etc

I want to merge/concatenate the records coming in from ActiveMQ based on event.driverID= db.driverID. so at the end i will have records of the sort:

driverID, pickup time, Drop time, Cost, passenger count,driver name, age, gender, address, Driver vehicle, driving license details

Any suggestion how can i achieve this?

There are a few concerns that I have, it makes no sense to keep on querying the database for every streaming event. It makes more sense to get the entire db content fetched once and have it in NiFi as the streams come in and every event is then compared with this data.


Expert Contributor

any suggestions guys?

Super Guru

A "database lookup" processor does not exist in NiFi as of this answer; however there are some alternatives you can consider:

  3. InvokeScriptedProcessor (whose initialize() method would cache the table, and onTrigger() would do the lookup)

Expert Contributor

Thanks @Matt Burgess!

I am wondering the executesql accepts an incoming connection. After getting the csv and then converting it to JSON and then eventually extracting all the information in the attributes

These attributes will be available to the ExecuteSql processor if I direct the flow to it. Can i then use it to perform a query like

SELECT `driverinfo`.`idDriverInfo`, `driverinfo`.`DriverName`, `driverinfo`.`DriverAge`, `driverinfo`.`DriverLicence`, `driverinfo`.`DriverCar`, `driverinfo`.`DriverNationality`, `driverinfo`.`DriverRating` FROM `driverdb`.`driverinfo` WHERE `driverinfo`.`idDriverInfo`= ${id};

wwhere the ${id} is the attribute of the incoming flow file. In case the query is successful a new flow file will be generated? what will happen to the original flow flie?

Super Guru

Yes if ExecuteSQL has an incoming connection, it expects the query to be the content of the flow file. If you need to generate that and evaluate Expression language, use ReplaceText before ExecuteSQL. ExecuteSQL will not (currently) evaluate Expression Language for incoming queries, feel free to write a Jira to add this improvement.

Expert Contributor

Hi @Matt Burgess

I tried the technique mentioned below. It works. I get the database result as the content of the file and the csv data as the attributes to the file.... eventually I can also move the data to the attributes and then i have all the content in one place. Hence Merged. I am not sure if this is the ideal solution. But it works! thoughts?

Super Guru

Attributes are in-memory and follow a flow file around (each flow file gets its own copy of the attributes), so you want to keep the number and size of attributes to a minimum for best performance. However if the solution works for you and doesn't cause performance or memory issues, then great! 🙂

Take a Tour of the Community
Don't have an account?
Your experience may be limited. Sign in to explore more.