Created 03-16-2023 02:20 AM
I'm trying to using NIFI dynamically filter flow Files in JSON format and keep only those values that are present is one SQL table.
I'm using QueryRecord processor, with query like:
SELECT * FROM FLOWFILE WHERE ${key} = ${value}
only those values that are present in QueryDatabaseTable variables should be kept.
Flow Files to be filtered looks like these:
[ { "CustomerID" : 1, "OrderNbr" : "12", "Amount":100 } ]
CustomerID field is the one to check and filter data.
My current NIFI flow:
Created on 03-16-2023 07:23 AM - edited 03-16-2023 07:33 AM
Have you thought on using LookupRecord instead of QueryRecord? In LookupRecord you can define your RecordReader, to be able to read your JSON input data, as well as a RecordWriter, to write your data in whatever format you want. In addition to this, you have the Lookup Service, where you can use the DatabaseRecordLookupService (as Controller Service) and connect to your database and looking up a Key-Value Combination based on a table name. Or you can try with LookupAttribute if you want to add an attribute in your JSON File with the value you want to use to perform the routing of the file. You can use in LookupAttribute the SimpleDatabaseLookupService where you connect to your DB and define the table in which you query the Key-Value combination.
As I did not fully understand your question (nor do I see one), the provided answer is a generic one.
Created 03-16-2023 08:13 AM
My question is more about How to write SQL query to filter it out? My current query: SELECT * FROM FLOWFILE WHERE Customer_AccountID = ${value}
Created 03-17-2023 01:00 AM
Well if I understand your flow logic correctly, what you are trying to achieve is not really possible, so please correct me if I am wrong.
In your QueryRecord you receive input from the InputPort and from the QueryDatabaseTable Flow as well. Based on the FlowFile coming from that InputPort, you are trying to filter the data, using a SQL which is built on the input coming from the QueryDatabaseTable Flow, right? Basically you want to filter the InputPort Data (FlowFile1) based on the ${value} from QueryDatabaseTable (FlowFile2).
Based on my current experience with NiFi, such a thing is not possible because the processors work on a single FlowFile and not with 2+ FlowFiles (unless merged). But maybe somebody with more experience can tell you otherwise.
You would need to rethink your flow and build something else --> hence the idea with LookupRecord/LookupAttribute.