Support Questions

Find answers, ask questions, and share your expertise

Apache Nifi - Add attribute to Flow File based on Sql query resultset

Explorer

Hi to all,

let's say i have a flow file with some given attributes.

What i want to achieve is to add flow attribute with a new one those value is retrieved from SQL Server query, where 2 existing attributes is passed in where clause(of course will return only one result)

 

Just for example

Current Flow file:

year = 2018

brand = audi

I would add a third attribute called "model" with value retrieved from a following query:

"SELECT model FROM cars WHERE year=${year} AND brand=${brand}"

 

I've searched around but not still found a good answer....

Tried evaluating lookup services but still not usable in this scenario.

 

Can anyone can point me on the right direction?

 

Many Thanks

2 ACCEPTED SOLUTIONS

Master Guru

@Ray82 

Thank you for the additional details. It appears we have a disconnect in NiFi terminology being used. 

A NiFi FlowFile is the object that is passed from one component to another via connection on the NiFi UI canvas.   A FlowFile consists of two parts:
1. FlowFile Content - This is the content for the FlowFile and is stored in the NiFi content repository within a content claim.  A single content claim may contain the content fro one too many FlowFiles.
2. FlowFile Attributes/Metadata - Written to the NiFi FlowFile repository. This is a collection of Metadata about the FlowFile such as which claim contains the content for the FlowFile. The offset within that claim where the content starts and then the length of the content.  It also contains attributes associated to the specific FlowFile like filename, size, etc.  User can also add additional FlowFile Attributes.  These FlowFile attributes are not part of the content of the FlowFile.

- Your use case starts with an existing FlowFile with specific content 
- So I am guessing that you are extracting portions of content from your original FlowFile and assigning them to NiFi FlowFile attributes? 
- Then using those FlowFile attributes in the SQL query executed by the ExecuteSQL processor?
    The ExecuteSQL processor is designed to write the SQL query response to the content of the FlowFile (new content, does not append to existing content)
- What you really want to do is preserve the original FlowFile and add another FlowFile Attribute to it that was retrieved using your ExecuteSQL overwriting the original FlowFile content?

Maybe you could use the DistributedMapCache processors to accomplish this:

MattWho_0-1660239255988.png

 


The processor in the upper right corner is an UpdateAttribute processor.  It is used to create a cacheID that will persist on both FlowFiles output from this processor (original and clone since "success" relationship was drawn twice).  

MattWho_1-1660241492158.png

It simply takes the UUID from original FlowFile and adds it to each FlowFile in the FlowFile attribute "cacheID".
Then you would still have your ExecuteSQL and ExtractText flow to replace content with just "A3" model.
Then I configure both my PutDistributedMapCache and FetchDistributedMapCache processors to use "${cacheID} as the "Cache Entry Identifier".
The FlowFile with the original content will move to the FetchDistributedMapCache processor where it will loop in the "not.found" relationship connection until the other FlowFile using same cacheID value writes you model "A3" to that unique cache entry via the PutDistributedMapCache (writes content of FlowFile to cache entry) processor.  
FetchDistributedMapCache:

MattWho_2-1660241801201.png

PutDistributedMapCache:

MattWho_3-1660241854067.png

In my example I am using the "DistributedMapCacheClientService" (simply because it is quick and easy), but there are better options that offer High Availability (HA).  The DistributedMapCacheClientService requires that NiFi also has a DistributedMapCacheServer Controller service for it talk to and store your cache entries in.

If you found this response assisted with your query, please take a moment to login and click on "Accept as Solution" below this post.

Thank you,

Matt

View solution in original post

Explorer

Hi, many thanks for your help.

The solution provided works like a charm!!!!

View solution in original post

6 REPLIES 6

Master Guru

@Ray82 
I am assuming you are using the ExecuteSQL processor to execute the SQL Select statement example you shared.  The response would be written to the content of the FlowFile pass to the success relationship.  You could use the ExtractText processor to extract content from the FlowFile and assign it to a new FlowFile attribute you name "model".

If you found this response assisted with your query, please take a moment to login and click on "Accept as Solution" below this post.

Thank you,

Matt

Explorer

Your assumption is correct, however.....

My original flow file contains some attributes, as mentioned, and some json content.

After that i run ExecuteSQL processor which retrieve the information i would like to append to the original flowfile but....

The resulting flow file coming from success relationship of ExecuteSQL is only the resultset of the query and the original flow file is totally overwritten.

 

The result i want to achieve is the following:

 

1 - Original flow file

Contents:

{

'list_price':100,

'discount':10,

}

Attributes:

year = 2018

brand = audi

 

2 - ExecuteSQl to fetch a model

 

3 - Append model to original flow file as an additional attribute like this:

Contents:

{

'list_price':100,

'discount':10,

}

Attributes:

year = 2018

brand = audi

model=A3

 

What have i missing here?

Many Thanks

Master Guru

@Ray82 

Thank you for the additional details. It appears we have a disconnect in NiFi terminology being used. 

A NiFi FlowFile is the object that is passed from one component to another via connection on the NiFi UI canvas.   A FlowFile consists of two parts:
1. FlowFile Content - This is the content for the FlowFile and is stored in the NiFi content repository within a content claim.  A single content claim may contain the content fro one too many FlowFiles.
2. FlowFile Attributes/Metadata - Written to the NiFi FlowFile repository. This is a collection of Metadata about the FlowFile such as which claim contains the content for the FlowFile. The offset within that claim where the content starts and then the length of the content.  It also contains attributes associated to the specific FlowFile like filename, size, etc.  User can also add additional FlowFile Attributes.  These FlowFile attributes are not part of the content of the FlowFile.

- Your use case starts with an existing FlowFile with specific content 
- So I am guessing that you are extracting portions of content from your original FlowFile and assigning them to NiFi FlowFile attributes? 
- Then using those FlowFile attributes in the SQL query executed by the ExecuteSQL processor?
    The ExecuteSQL processor is designed to write the SQL query response to the content of the FlowFile (new content, does not append to existing content)
- What you really want to do is preserve the original FlowFile and add another FlowFile Attribute to it that was retrieved using your ExecuteSQL overwriting the original FlowFile content?

Maybe you could use the DistributedMapCache processors to accomplish this:

MattWho_0-1660239255988.png

 


The processor in the upper right corner is an UpdateAttribute processor.  It is used to create a cacheID that will persist on both FlowFiles output from this processor (original and clone since "success" relationship was drawn twice).  

MattWho_1-1660241492158.png

It simply takes the UUID from original FlowFile and adds it to each FlowFile in the FlowFile attribute "cacheID".
Then you would still have your ExecuteSQL and ExtractText flow to replace content with just "A3" model.
Then I configure both my PutDistributedMapCache and FetchDistributedMapCache processors to use "${cacheID} as the "Cache Entry Identifier".
The FlowFile with the original content will move to the FetchDistributedMapCache processor where it will loop in the "not.found" relationship connection until the other FlowFile using same cacheID value writes you model "A3" to that unique cache entry via the PutDistributedMapCache (writes content of FlowFile to cache entry) processor.  
FetchDistributedMapCache:

MattWho_2-1660241801201.png

PutDistributedMapCache:

MattWho_3-1660241854067.png

In my example I am using the "DistributedMapCacheClientService" (simply because it is quick and easy), but there are better options that offer High Availability (HA).  The DistributedMapCacheClientService requires that NiFi also has a DistributedMapCacheServer Controller service for it talk to and store your cache entries in.

If you found this response assisted with your query, please take a moment to login and click on "Accept as Solution" below this post.

Thank you,

Matt

Explorer

Hi,

thanks for your guidelines.

I am currently on vacation....I will test it once i will come back and sure, i will mark as solved if it works as expected...

 

In a while....many thanks

Explorer

Hi, many thanks for your help.

The solution provided works like a charm!!!!

Community Manager

@Ray82, Has @MattWho's reply helped resolve your issue? If so, please mark the appropriate reply as the solution, as it will make it easier for others to find the answer in the future. 



Regards,

Vidya Sargur,
Community Manager


Was your question answered? Make sure to mark the answer as the accepted solution.
If you find a reply useful, say thanks by clicking on the thumbs up button.
Learn more about the Cloudera Community:
Take a Tour of the Community
Don't have an account?
Your experience may be limited. Sign in to explore more.