Created 06-13-2024 03:11 AM
I'm using "PutDatabaseRecord (NiFi 1.25.0)" for data insert/update purposes in MySql.
Imagine,
I have a table called "Customer" which has running primary key called "Id" and the another field call "id_from_core". The only mentioned "Id" field as auto generated primary key in "Customer" table.
But when data insert through the NiFi flow, I mentioned "id_from_core" field as "Update Key" property in "PutDatabaseRecord".
Here is the "Customer" table,
CREATE TABLE `Customer` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`created_by` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
`created_date` datetime(0) NULL DEFAULT NULL,
`record_status` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
`surname` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
`id_from_core` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 175 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;
My problem is, update action is not work properly, while inserting same values in "id_from_core" field data, it should be a update, but it's not updated. Insert as a new record. Could you please anyone know the reason for that matter and what is the solution for that.
Thanks in advance.
Created 06-14-2024 02:12 AM
Hi @Thar11027
I stand corrected. Well, lets be more specific and you can't get more specific than looking the code itself in github :). It turns out the PutDatabaseRecord uses a DatabaseAdapter which is an interface type that gets implement by each Database Engine Type and passed through the DB service associated with this processor (DBCPConnectionPool). Those adapters are responsible for generating the SQL for each statement type (insert, update, delete....). For MySql there is an adapter called MySQLDatabaseAdapter and if you look at the genereateUpsertStatement method you will find that it uses the following syntax:
StringBuilder statementStringBuilder = new StringBuilder("INSERT INTO ")
.append(table)
.append("(").append(columns).append(")")
.append(" VALUES ")
.append("(").append(parameterizedInsertValues).append(")")
.append(" ON DUPLICATE KEY UPDATE ")
.append(parameterizedUpdateValues);
return statementStringBuilder.toString();
Notice the use of "ON DUPLICATE KEY UPDATE" syntax. If you look for what that means in MySQL (https://blog.devart.com/mysql-upsert.html ) you will find that yes it will check if the record key exists or not , and if it does then it will do an update state however that only works on table Primary Key. In your case for the Transaction tale it works because as you mentioned the transaction_id is the primary key and you probably passing this column as part of the record data, however for the other table the id set to auto increment and probably you are not passing it as part of the record and instead relying on none primary key id_from_core. Not sure if its possible to change your table where this column is your primary key, otherwise you will find yourself having to do lookup to find if it exists or not and may be get the id then do your upsert with the id but Im not sure how this will work with Auto Increment being set. Another option which I tend to do in my case to avoid adding more processors\control services is to create stored proc that will defer all that checking for update or insert to sql then use PutSQL processor to execute the stored proc passing all columns to it but this can be cumbersome if you have so many columns which seem to be your case. What you can do to avoid passing each column is pass record as json string and do json parsing to find the column values in mySQL.
Hope that helps.
Created 06-13-2024 04:43 AM
Hi @Thar11027 ,
I dont think there is an UPSERT statement in MySQL if Im not wrong. I think its treating it as regular insert and hence you are seeing duplicate entries. If you want to use PUTDatabaseRecord processor then you have to create two: one for insert and another for update and to decide which one you need to run you have to do Lookup to see if the customer with the same core id exists or not. For that you can use Lookup Record (refer to : https://community.cloudera.com/t5/Community-Articles/Data-flow-enrichment-with-NiFi-part-1-LookupRec... )processor to enrich your data with the customer core Id if exists, then you check if the record is found (meaning id exist) you route to Update otherwise you route to Insert.
Hope that helps. If it does please accept solution.
Thanks
Created 06-13-2024 09:52 PM
Thanks @SAMSAL for your quick response.
But I have another "PutDatabaseRecord" in work flow referring database table called "Transaction". It includes Primary key called "transaction_id" without auto incrementing and "transaction_id" is the value of "Update key" in "PutDatabaseRecord".
In here, update and insert process are working properly. What is the reason for difference.
Created 06-14-2024 02:12 AM
Hi @Thar11027
I stand corrected. Well, lets be more specific and you can't get more specific than looking the code itself in github :). It turns out the PutDatabaseRecord uses a DatabaseAdapter which is an interface type that gets implement by each Database Engine Type and passed through the DB service associated with this processor (DBCPConnectionPool). Those adapters are responsible for generating the SQL for each statement type (insert, update, delete....). For MySql there is an adapter called MySQLDatabaseAdapter and if you look at the genereateUpsertStatement method you will find that it uses the following syntax:
StringBuilder statementStringBuilder = new StringBuilder("INSERT INTO ")
.append(table)
.append("(").append(columns).append(")")
.append(" VALUES ")
.append("(").append(parameterizedInsertValues).append(")")
.append(" ON DUPLICATE KEY UPDATE ")
.append(parameterizedUpdateValues);
return statementStringBuilder.toString();
Notice the use of "ON DUPLICATE KEY UPDATE" syntax. If you look for what that means in MySQL (https://blog.devart.com/mysql-upsert.html ) you will find that yes it will check if the record key exists or not , and if it does then it will do an update state however that only works on table Primary Key. In your case for the Transaction tale it works because as you mentioned the transaction_id is the primary key and you probably passing this column as part of the record data, however for the other table the id set to auto increment and probably you are not passing it as part of the record and instead relying on none primary key id_from_core. Not sure if its possible to change your table where this column is your primary key, otherwise you will find yourself having to do lookup to find if it exists or not and may be get the id then do your upsert with the id but Im not sure how this will work with Auto Increment being set. Another option which I tend to do in my case to avoid adding more processors\control services is to create stored proc that will defer all that checking for update or insert to sql then use PutSQL processor to execute the stored proc passing all columns to it but this can be cumbersome if you have so many columns which seem to be your case. What you can do to avoid passing each column is pass record as json string and do json parsing to find the column values in mySQL.
Hope that helps.