Created 08-10-2022 09:25 AM
Hi to all,
let's say i have a flow file with some given attributes.
What i want to achieve is to add flow attribute with a new one those value is retrieved from SQL Server query, where 2 existing attributes is passed in where clause(of course will return only one result)
Just for example
Current Flow file:
year = 2018
brand = audi
I would add a third attribute called "model" with value retrieved from a following query:
"SELECT model FROM cars WHERE year=${year} AND brand=${brand}"
I've searched around but not still found a good answer....
Tried evaluating lookup services but still not usable in this scenario.
Can anyone can point me on the right direction?
Many Thanks
Created 08-11-2022 11:23 AM
@Ray82
Thank you for the additional details. It appears we have a disconnect in NiFi terminology being used.
A NiFi FlowFile is the object that is passed from one component to another via connection on the NiFi UI canvas. A FlowFile consists of two parts:
1. FlowFile Content - This is the content for the FlowFile and is stored in the NiFi content repository within a content claim. A single content claim may contain the content fro one too many FlowFiles.
2. FlowFile Attributes/Metadata - Written to the NiFi FlowFile repository. This is a collection of Metadata about the FlowFile such as which claim contains the content for the FlowFile. The offset within that claim where the content starts and then the length of the content. It also contains attributes associated to the specific FlowFile like filename, size, etc. User can also add additional FlowFile Attributes. These FlowFile attributes are not part of the content of the FlowFile.
- Your use case starts with an existing FlowFile with specific content
- So I am guessing that you are extracting portions of content from your original FlowFile and assigning them to NiFi FlowFile attributes?
- Then using those FlowFile attributes in the SQL query executed by the ExecuteSQL processor?
The ExecuteSQL processor is designed to write the SQL query response to the content of the FlowFile (new content, does not append to existing content)
- What you really want to do is preserve the original FlowFile and add another FlowFile Attribute to it that was retrieved using your ExecuteSQL overwriting the original FlowFile content?
Maybe you could use the DistributedMapCache processors to accomplish this:
The processor in the upper right corner is an UpdateAttribute processor. It is used to create a cacheID that will persist on both FlowFiles output from this processor (original and clone since "success" relationship was drawn twice).
It simply takes the UUID from original FlowFile and adds it to each FlowFile in the FlowFile attribute "cacheID".
Then you would still have your ExecuteSQL and ExtractText flow to replace content with just "A3" model.
Then I configure both my PutDistributedMapCache and FetchDistributedMapCache processors to use "${cacheID} as the "Cache Entry Identifier".
The FlowFile with the original content will move to the FetchDistributedMapCache processor where it will loop in the "not.found" relationship connection until the other FlowFile using same cacheID value writes you model "A3" to that unique cache entry via the PutDistributedMapCache (writes content of FlowFile to cache entry) processor.
FetchDistributedMapCache:
PutDistributedMapCache:
In my example I am using the "DistributedMapCacheClientService" (simply because it is quick and easy), but there are better options that offer High Availability (HA). The DistributedMapCacheClientService requires that NiFi also has a DistributedMapCacheServer Controller service for it talk to and store your cache entries in.
If you found this response assisted with your query, please take a moment to login and click on "Accept as Solution" below this post.
Thank you,
Matt
Created 08-29-2022 04:50 AM
Hi, many thanks for your help.
The solution provided works like a charm!!!!
Created 08-10-2022 11:02 AM
@Ray82
I am assuming you are using the ExecuteSQL processor to execute the SQL Select statement example you shared. The response would be written to the content of the FlowFile pass to the success relationship. You could use the ExtractText processor to extract content from the FlowFile and assign it to a new FlowFile attribute you name "model".
If you found this response assisted with your query, please take a moment to login and click on "Accept as Solution" below this post.
Thank you,
Matt
Created 08-11-2022 02:21 AM
Your assumption is correct, however.....
My original flow file contains some attributes, as mentioned, and some json content.
After that i run ExecuteSQL processor which retrieve the information i would like to append to the original flowfile but....
The resulting flow file coming from success relationship of ExecuteSQL is only the resultset of the query and the original flow file is totally overwritten.
The result i want to achieve is the following:
1 - Original flow file
Contents:
{
'list_price':100,
'discount':10,
}
Attributes:
year = 2018
brand = audi
2 - ExecuteSQl to fetch a model
3 - Append model to original flow file as an additional attribute like this:
Contents:
{
'list_price':100,
'discount':10,
}
Attributes:
year = 2018
brand = audi
model=A3
What have i missing here?
Many Thanks
Created 08-11-2022 11:23 AM
@Ray82
Thank you for the additional details. It appears we have a disconnect in NiFi terminology being used.
A NiFi FlowFile is the object that is passed from one component to another via connection on the NiFi UI canvas. A FlowFile consists of two parts:
1. FlowFile Content - This is the content for the FlowFile and is stored in the NiFi content repository within a content claim. A single content claim may contain the content fro one too many FlowFiles.
2. FlowFile Attributes/Metadata - Written to the NiFi FlowFile repository. This is a collection of Metadata about the FlowFile such as which claim contains the content for the FlowFile. The offset within that claim where the content starts and then the length of the content. It also contains attributes associated to the specific FlowFile like filename, size, etc. User can also add additional FlowFile Attributes. These FlowFile attributes are not part of the content of the FlowFile.
- Your use case starts with an existing FlowFile with specific content
- So I am guessing that you are extracting portions of content from your original FlowFile and assigning them to NiFi FlowFile attributes?
- Then using those FlowFile attributes in the SQL query executed by the ExecuteSQL processor?
The ExecuteSQL processor is designed to write the SQL query response to the content of the FlowFile (new content, does not append to existing content)
- What you really want to do is preserve the original FlowFile and add another FlowFile Attribute to it that was retrieved using your ExecuteSQL overwriting the original FlowFile content?
Maybe you could use the DistributedMapCache processors to accomplish this:
The processor in the upper right corner is an UpdateAttribute processor. It is used to create a cacheID that will persist on both FlowFiles output from this processor (original and clone since "success" relationship was drawn twice).
It simply takes the UUID from original FlowFile and adds it to each FlowFile in the FlowFile attribute "cacheID".
Then you would still have your ExecuteSQL and ExtractText flow to replace content with just "A3" model.
Then I configure both my PutDistributedMapCache and FetchDistributedMapCache processors to use "${cacheID} as the "Cache Entry Identifier".
The FlowFile with the original content will move to the FetchDistributedMapCache processor where it will loop in the "not.found" relationship connection until the other FlowFile using same cacheID value writes you model "A3" to that unique cache entry via the PutDistributedMapCache (writes content of FlowFile to cache entry) processor.
FetchDistributedMapCache:
PutDistributedMapCache:
In my example I am using the "DistributedMapCacheClientService" (simply because it is quick and easy), but there are better options that offer High Availability (HA). The DistributedMapCacheClientService requires that NiFi also has a DistributedMapCacheServer Controller service for it talk to and store your cache entries in.
If you found this response assisted with your query, please take a moment to login and click on "Accept as Solution" below this post.
Thank you,
Matt
Created 08-22-2022 05:12 AM
Hi,
thanks for your guidelines.
I am currently on vacation....I will test it once i will come back and sure, i will mark as solved if it works as expected...
In a while....many thanks
Created 08-29-2022 04:50 AM
Hi, many thanks for your help.
The solution provided works like a charm!!!!
Created 07-12-2024 05:03 AM
What if I want to pass attributes instead of flowfile content?
Created 08-21-2022 10:21 PM
@Ray82, Has @MattWho's reply helped resolve your issue? If so, please mark the appropriate reply as the solution, as it will make it easier for others to find the answer in the future.
Regards,
Vidya Sargur,