Created on 02-27-2024 11:15 PM - edited 02-27-2024 11:17 PM
I'm writing a flow in Apache NiFi. In the problematic part, I have a Flow File containing several records and a table (table 1) in MySQL Server which need to be updated. Every record in Flow File can be mapped to a row in table 1 based on values in a key column. Now I want to achieve the following goals:
1. Update fields in the matched row using field values in records as long as they are different.
2. Document the changes in another MySQL table (table 2), which has columns: the mentioned key column, date when field changes, revision id (a column from the record in Flow File), field id (i.e. which field has been changed. I have a table which maps column names to field id, FYI), old value, new value)
For example, if a record in the Flow File is:
{
'key': 1234, 'field_1': 'new_1', 'field_2': 'new_2', field_3': 'no_change_3'
}
and the corresponding row in table 1 is:
{
'key': 1234, 'field_1': 'old_1', 'field_2': 'old_2', field_3': 'no_change_3'
}
After processing, the row in table 1 should become:
{
'key': 1234, 'field_1': 'new_1', 'field_2': 'new_2', field_3': 'no_change_3'
}
and table 2 should have 2 new rows:
{
'key': 1234, 'date': 'some date', 'revision_id': 'some revision id', 'field_id': 1, 'old_value': 'old_1', 'new_value': 'new_1'
},
{
'key': 1234, 'date': 'some date', 'revision_id': 'some revision id', 'field_id': 2, 'old_value': 'old_2', 'new_value': 'new_2'
}
I used PutDatabaseRecord before to do the update, but in this way, I could not achieve the second goal as the Processor seems not allow for customized queries.
Is there a way to achieve my purpose here?
Created 02-28-2024 10:35 AM
Hi @iriszhuhao ,
I think you are trying to address to issues in one processor and I dont think the PutDatabaseRecord processor can do that:
1- Insert\update the records in Table1
2- Add changes log to Table2
Lets address Problem 1 first with Table1, I think the PutDatabaseRecord can handle multiple records in flowfile if you have a valid json containing all records as an array , in this case you can try setting up the Data Record Path property and depending on what version you are using , you can set also the "Statement Type Record Path" to determine what type of sql statement for each record (UPDATE, INSERT....etc.). This means you have to know before hand what statements applies to what record and that information is set alongside each record as part of the whole json flowfile. If you dont have the statement type attach to each record or you dont have the records as json array , i.e. each json record is separated with new line then you have to split those records using SplitText processor and then do a lookup to find if the record exist or not in the database to decide the proper sql statement. For the lookup you can use something LookupREcord processor with SimpleDatabaseLookupService.
Regarding Problem 2 for adding change log to table2 , you have few options:
1- After you execute PutDatabaseRecord with the SQL statement provided as an attribute when you did the LookRecord to see if the record exists (update) or not (insert) then you can use the same attribute to write the proper SQL script ( or create sql stored proc) to add the change log and then use that SQL in the PutSQL processor in the SQL Statement property. In this case you have to extract record fields into attribute as well to provide to the SQL statement with the field_id, old_value & new_value so that it can check what value has changed or not, or pass the whole json record to sql and do the parsing there instead. This can be cumbersome if you have too many fields.
2- Instead of handling this in Nifi , you can define Update\Insert Trigger on Table 1 , then define the logic on the trigger to add proper log information depending on the old vs new values that get captured with the trigger. Again this might get cumbersome if you have too many fields to check and it might impact performance when updating data in table 1 but its cleaner than the first option.
3- Using CDC (Change Data Capture) , I know this comes out of the box with SQL server but I think for mysql you need to use a third party tool in order to set it up. I cant give you much details on how to because I dont know myself but you can research it. The idea here is to let CDE capture what has changed\inserted\deleted and then you can read that log info and store it in whatever format\table you desire. Here you dont have to check what field has changed because the CDC will do that for you.
Hope that helps. If it does, please let me know if you have any questions otherwise please accept the solution.
Thanks
Created 03-05-2024 11:43 PM
Hi @SAMSAL ,
I have thought to write a stored procedure to compare field values and then log value changes to solve problem 2, but one thing is that Records in a Flow File cannot be passed to a stored procedure and interact with the database data in a stored procedure directly. Maybe I didn't know the correct processor or script to use. Do you have any insight on this?
Created 03-05-2024 11:42 PM
Hi @SAMSAL ,
I have thought to write a stored procedure to compare field values and then log value changes to solve problem 2, but one thing is that Records in a Flow File cannot be passed to a stored procedure and interact with the database data in a stored procedure directly. Maybe I didn't know the correct processor or script to use. Do you have any insight on this?