Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

NiFi custom processor to read database view on update of database

avatar
New Contributor

I am new to NiFi and I am developing a custom processor to pull the most recent data from a psql database view . I can retrieve the database view, with the code below, when the custom processor is initialized .

private void GetData(){
    Connection connection = DriverManager.getConnection("jdbc:postgresql://example:5432/example", "user", "pass");
    Statement statement = connection.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
    ResultSet rs = statement.executeQuery("SELECT * FROM Example_Table");
    while(rs.next()){
        //Get data from database
    }
    connection.close();
}

However I am struggling to get the recent updates from the database view. The primary issue is when a new entry is added into the database. Since the database is queried when the processor is initialized, the custom processor will not have the new entry.


I tried to implement the query within public void onTrigger() function; however this will cause the pipe to back up, because it will query the database on every flowfile (which is not ideal if there are thousands of flowfiles coming in per second).


Is there a method to query the database at start up of the processor; without querying the database on every flowfile? Alternatively, would it be possible to detect if the database has been modified and pulling the data on modification? Or even set a timer to pull the database within the custom processor?


Any assistance is much appreciated, thank you in advance.

2 REPLIES 2

avatar
New Contributor

avatar
Super Guru

Check out the following processor?

CaptureChangeMySQL

Retrieves Change Data Capture (CDC) events from a MySQL database. CDC Events include INSERT, UPDATE, DELETE operations. Events are output as individual flow files ordered by the time at which the operation occurred.