Created 01-20-2021 02:54 AM
I'm using CaptureChangeMySQL Processor to capture MySQL event data. for the update event I got the following data in my FlowFile.
{
"type" : "update",
"timestamp" : 1610120620000,
"binlog_filename" : "delta.000002",
"binlog_position" : 1943,
"database" : "test",
"table_name" : "Employee",
"table_id" : 92,
"columns" : [ {
"id" : 1,
"name" : "id",
"column_type" : 4,
"last_value" : 1,
"value" : 1
}, {
"id" : 2,
"name" : "Name",
"column_type" : 12,
"last_value" : "Khalid",
"value" : "John"
}, {
"id" : 3,
"name" : "Email",
"column_type" : 12,
"last_value" : "lacus.varius@utsem.net",
"value" : "lacus.varius@utsem.net"
}, {
"id" : 4,
"name" : "City",
"column_type" : 12,
"last_value" : "Luziânia",
"value" : "Luziânia"
} ]
}
then I'm using EvaluateJsonPath Processor to add attributes like event_type,timestamp,table_name,value of column name id
then I want to use the RouteOnAttribute processor to query other tables apart from the set attribute name (table_name) to merge all records in a single JSON that has the same id.
Now the main problem is that if more tables updated at the same time it will generate extra record(which is completely useless also it will query other tables multiple times)
So, I decided to use QueryRecord Processor and Set Run Schedule at 2-5 minutes.
Now I want to Query FlowFiles like
SELECT * FROM FLOWFILES WHERE '$.timestamp' = (SELECT MAX('$.timestamp') FROM FLOWFILE) GROUP BY '$.id'
It means It will fetch those flow files which has a unique id and Maximum timestamp value. this query is not working. so how can I use attributes in the QueryRecord Processor to achieve this scenario? or is there any better way to handle this?
Created 01-21-2021 08:25 AM
use updaterecord to add fields
Created 01-28-2021 01:20 AM
@TimothySpann I've added attributes using EvaluateJsonPath.
I want to filter my queued flow files based on this condition
Get those flow files which has a higher value of event_time attribute among the same id attribute.
I've checked the Updateattribute processor but didn't succeed to achieve this scenario.
Created 01-28-2021 07:34 AM
QueryRecord is the way to go. You can compare with a sql
SELECT * FROM FLOWFILE
WHERE timestamp > ${event_time}
and value = ${id}