Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Apache Nifi QueryRecord based on FlowFile attribute

avatar
New Contributor

I'm using CaptureChangeMySQL Processor to capture MySQL event data. for the update event I got the following data in my FlowFile.

 

{
  "type" : "update",
  "timestamp" : 1610120620000,
  "binlog_filename" : "delta.000002",
  "binlog_position" : 1943,
  "database" : "test",
  "table_name" : "Employee",
  "table_id" : 92,
  "columns" : [ {
    "id" : 1,
    "name" : "id",
    "column_type" : 4,
    "last_value" : 1,
    "value" : 1
  }, {
    "id" : 2,
    "name" : "Name",
    "column_type" : 12,
    "last_value" : "Khalid",
    "value" : "John"
  }, {
    "id" : 3,
    "name" : "Email",
    "column_type" : 12,
    "last_value" : "lacus.varius@utsem.net",
    "value" : "lacus.varius@utsem.net"
  }, {
    "id" : 4,
    "name" : "City",
    "column_type" : 12,
    "last_value" : "Luziânia",
    "value" : "Luziânia"
  } ]
}

then I'm using EvaluateJsonPath Processor to add attributes like event_type,timestamp,table_name,value of column name id

then I want to use the RouteOnAttribute processor to query other tables apart from the set attribute name (table_name) to merge all records in a single JSON that has the same id.


Now the main problem is that if more tables updated at the same time it will generate extra record(which is completely useless also it will query other tables multiple times)

So, I decided to use QueryRecord Processor and Set Run Schedule at 2-5 minutes.

Now I want to Query FlowFiles like

SELECT * FROM FLOWFILES WHERE '$.timestamp' = (SELECT MAX('$.timestamp') FROM FLOWFILE) GROUP BY '$.id'

It means It will fetch those flow files which has a unique id and Maximum timestamp value. this query is not working. so how can I use attributes in the QueryRecord Processor to achieve this scenario? or is there any better way to handle this?

3 REPLIES 3

avatar
Master Guru

use updaterecord to add fields

avatar
New Contributor

@TimothySpann I've added attributes using EvaluateJsonPath. 

EvaluateJsonPath.png

 

 


I want to filter my queued flow files based on this condition

Get those flow files which has a higher value of  event_time attribute among the same id attribute.

I've checked the Updateattribute processor but didn't succeed to achieve this scenario.

avatar
Master Guru

QueryRecord is the way to go.   You can compare with a sql

 

SELECT * FROM FLOWFILE

WHERE timestamp > ${event_time}

and value = ${id}