Support Questions
Find answers, ask questions, and share your expertise

Apache Nifi QueryRecord based on FlowFile attribute

Apache Nifi QueryRecord based on FlowFile attribute

New Contributor

I'm using CaptureChangeMySQL Processor to capture MySQL event data. for the update event I got the following data in my FlowFile.

 

{
  "type" : "update",
  "timestamp" : 1610120620000,
  "binlog_filename" : "delta.000002",
  "binlog_position" : 1943,
  "database" : "test",
  "table_name" : "Employee",
  "table_id" : 92,
  "columns" : [ {
    "id" : 1,
    "name" : "id",
    "column_type" : 4,
    "last_value" : 1,
    "value" : 1
  }, {
    "id" : 2,
    "name" : "Name",
    "column_type" : 12,
    "last_value" : "Khalid",
    "value" : "John"
  }, {
    "id" : 3,
    "name" : "Email",
    "column_type" : 12,
    "last_value" : "lacus.varius@utsem.net",
    "value" : "lacus.varius@utsem.net"
  }, {
    "id" : 4,
    "name" : "City",
    "column_type" : 12,
    "last_value" : "Luziânia",
    "value" : "Luziânia"
  } ]
}

then I'm using EvaluateJsonPath Processor to add attributes like event_type,timestamp,table_name,value of column name id

then I want to use the RouteOnAttribute processor to query other tables apart from the set attribute name (table_name) to merge all records in a single JSON that has the same id.


Now the main problem is that if more tables updated at the same time it will generate extra record(which is completely useless also it will query other tables multiple times)

So, I decided to use QueryRecord Processor and Set Run Schedule at 2-5 minutes.

Now I want to Query FlowFiles like

SELECT * FROM FLOWFILES WHERE '$.timestamp' = (SELECT MAX('$.timestamp') FROM FLOWFILE) GROUP BY '$.id'

It means It will fetch those flow files which has a unique id and Maximum timestamp value. this query is not working. so how can I use attributes in the QueryRecord Processor to achieve this scenario? or is there any better way to handle this?

3 REPLIES 3

Re: Apache Nifi QueryRecord based on FlowFile attribute

Super Guru

use updaterecord to add fields

Re: Apache Nifi QueryRecord based on FlowFile attribute

New Contributor

@TimothySpann I've added attributes using EvaluateJsonPath. 

EvaluateJsonPath.png

 

 


I want to filter my queued flow files based on this condition

Get those flow files which has a higher value of  event_time attribute among the same id attribute.

I've checked the Updateattribute processor but didn't succeed to achieve this scenario.

Re: Apache Nifi QueryRecord based on FlowFile attribute

Super Guru

QueryRecord is the way to go.   You can compare with a sql

 

SELECT * FROM FLOWFILE

WHERE timestamp > ${event_time}

and value = ${id}