Member since
07-07-2022
42
Posts
2
Kudos Received
8
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 5863 | 07-27-2022 12:05 PM | |
| 2581 | 07-25-2022 05:01 AM | |
| 1391 | 07-21-2022 02:03 AM | |
| 3136 | 07-21-2022 01:52 AM | |
| 1907 | 07-19-2022 10:48 AM |
07-20-2022
04:26 AM
Hi, I want to modify the ddl query table name. I am using the replaceText processor to modify the flowfile query table name. Currently I am able to achieve it because I have one source DB and only one table name I have changed in destination DB. Like sourceDBName: nrpsubscriotion and tableName: status -> destinationDBName: nrpresport and tableName: subscription_status. Now I have two source DB(1. nrpsubscriotion, 2. nrpuserorg) and both have same tableName: status. Now Source DB 2 , tableName in destination DB nrpreport has tableName: user_org_status. I have stuck here , I am not getting how can define a condition to replce a tableName in DDL based on schema name and ddl query able name: It should map like below: 1. SourceDBName: nrpsubscriotion and tableName: status -> destinationDBName: nrpresport and tableName: subscription_status. Flow File input: { "type" : "ddl", "timestamp" : 1658314685000, "binlog_gtidset" : "e75d07af-eb37-11ec-9d1d-a86daa745b08:1-206", "database" : "nrpsubscription", "table_name" : null, "table_id" : null, "query" : "ALTER TABLE `status` ADD COLUMN `is_test` VARCHAR(255) NULL AFTER `code`" } Expected output: { "type" : "ddl", "timestamp" : 1658314685000, "binlog_gtidset" : "e75d07af-eb37-11ec-9d1d-a86daa745b08:1-206", "database" : "nrpsubscription", "table_name" : null, "table_id" : null, "query" : "ALTER TABLE `subscription_status` ADD COLUMN `is_test` VARCHAR(255) NULL AFTER `code`" } 2. SourceDBName: nrpuserorg and tableName: status -> destinationDBName: nrpreport and tableName: user_org_status. { "type" : "ddl", "timestamp" : 1658314686000, "binlog_gtidset" : "e75d07af-eb37-11ec-9d1d-a86daa745b08:1-207", "database" : "nrpuserorg", "table_name" : null, "table_id" : null, "query" : "ALTER TABLE `status` ADD COLUMN `is_test` VARCHAR(255) NULL AFTER `code`" } Expected output: { "type" : "ddl", "timestamp" : 1658314686000, "binlog_gtidset" : "e75d07af-eb37-11ec-9d1d-a86daa745b08:1-207", "database" : "nrpuserorgdb", "table_name" : null, "table_id" : null, "query" : "ALTER TABLE `user_org_status` ADD COLUMN `is_test` VARCHAR(255) NULL AFTER `code`" }
... View more
Labels:
- Labels:
-
Apache NiFi
07-19-2022
10:48 AM
Hi , I have modified Sql statement using replace text processor but still getting below error when I alter a table and add new column. PutDatabaseRecord processor input json: { "type" : "ddl", "timestamp" : 1658252209000, "binlog_gtidset" : "e75d07af-eb37-11ec-9d1d-a86daa745b08:1-147", "database" : "nrpsubscriptiondb2306", "table_name" : null, "table_id" : null, "query" : "ALTER TABLE `nrpreportdb`.`user_org_status` ADD COLUMN `is_test` VARCHAR(255) NULL AFTER `code`" } Error log: Record had no (or null) value for Field Containing SQL: query, FlowFile StandardFlowFileRecord[uuid=bc7d51b3-e71f-4eb2-834b-fecbcff572be,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1658226664320-3, container=default, section=3], offset=37727, length=277],offset=0,name=bc7d51b3-e71f-4eb2-834b-fecbcff572be,size=277]
... View more
07-19-2022
10:32 AM
@rafy Above link is saying that to use one more processor PutSQL before PutDatabaseRecord to execute a statement. We already have PutDatabaseRecord processor to execute statement on db then why we should add putSQL processor? Also other than DDL if any other action(insert, update, delete) will be perform in source db, It will fail because putSql processor will expect sql statement to execute first and based on success next processor will be call. In this case provided link suggestion will not work.
... View more
07-19-2022
05:35 AM
Hi, I want to manipulate below string in updateAttribute processor. I tried replace function etc but it is not working for me. Can anyone suggest to modify below string to change DBName and TableName as per expected output. query = ${query:replace('nrpuserorgdb2306.status', 'nrpreportdb.user_org_status')} query : "ALTER TABLE `nrpuserorgdb2306`.`status` \nADD COLUMN `is_deleted` TINYINT NULL AFTER `code`" Expected output: query : ALTER TABLE `nrpreportdb`.`user_org_status` ADD COLUMN `is_test` TINYINT NULL AFTER `code`
... View more
Labels:
- Labels:
-
Apache NiFi
07-18-2022
12:52 AM
I have configured the NiFi flow file where I am reading mysql binary-log file. for me insert, update and delete flow is working fine but when I alter a table then I am getting error. sourceDBName: nrpsubscriptiondb2306 and tableName: product_types destinationDBName: nrpreportdb and tableName: product_types Alter table nifi error: Record had no (or null) value for Field Containing SQL: query, FlowFile StandardFlowFileRecord[uuid=8ca029c9-70aa-4e53-aec3-d37c15c8cd07,claim=StandardContentClaim [resourceClaim=StandardResourceClaim
... View more
Labels:
- Labels:
-
Apache NiFi
07-17-2022
11:32 PM
@alim, I have implemented the same flow file where I am ready mysql binary-log file. for me insert, update and delete flow is working fine but when I alter a table then I am getting error. sourceDBName: nrpsubscriptiondb2306 and tableName: product_types destinationDBName: nrpreportdb and tableName: product_types Alter table nifi error: Record had no (or null) value for Field Containing SQL: query, FlowFile StandardFlowFileRecord[uuid=8ca029c9-70aa-4e53-aec3-d37c15c8cd07,claim=StandardContentClaim [resourceClaim=StandardResourceClaim
... View more
07-17-2022
11:07 PM
@alim , I am using the captureChangeMySql processor to read the mysql binary-log file. This flow work fine for me when I configure the processor flow freshly but when I stop all the processor and start it gain then I am getting below error. Please suggest if you have any idea about the error issue. 2022-07-13 20:17:16,524 ERROR [Timer-Driven Process Thread-2] o.a.n.c.m.processors.CaptureChangeMySQL CaptureChangeMySQL[id=f5df756e-0181-1000-c210-1ab5c779f675] Processing failed org.apache.nifi.processor.exception.ProcessException: java.io.IOException: COMMIT event received while not processing a transaction (i.e. no corresponding BEGIN event). This could indicate that your binlog position is invalid. at org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL.onTrigger(CaptureChangeMySQL.java:721) at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1283) at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:214) at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:103) at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.io.IOException: COMMIT event received while not processing a transaction (i.e. no corresponding BEGIN event). This could indicate that your binlog position is invalid. at org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL.outputEvents(CaptureChangeMySQL.java:992) at org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL.onTrigger(CaptureChangeMySQL.java:705) ... 10 common frames omitted 2022-07-13 20:17:16,740 INFO [blc-127.0.0.1:3306] c.g.shyiko.mysql.binlog.BinaryLogClient Connected to 127.0.0.1:3306 at mysql-bin.000025/12330 (sid:65535, cid:6)
... View more
07-17-2022
11:04 AM
Hi, how can I alter the table using NIFI Processor where source dbname : org, tableName:status, destination dbName:report and tableName: orgStatus. I am using captureChangeMySql Processor to read the binary-bin log file. Below json i am getting in updateAtribute processor: {"type":"ddl","timestamp":1658078637000,"binlog_filename":"mysql-bin.000030","binlog_position":2727,"database":"nrpsubscriptiondb2306","table_name":null,"table_id":null,"query":"ALTER TABLE `nrpuserorgdb2306`.`status` \nADD COLUMN `is_deleted` TINYINT NULL AFTER `code`"} Please find my processor screenshot for this flow.
... View more
Labels:
- Labels:
-
Apache NiFi
07-13-2022
09:47 AM
Can anyone suggest how to fix above issue. This issue come when i stop the all processor and again start the all processor then I can see the error at CaptureChangeMySql processor level error. 2022-07-07 15:10:58,307 ERROR [Timer-Driven Process Thread-8] o.a.n.c.m.processors.CaptureChangeMySQL CaptureChangeMySQL[id=d7f0536f-0181-1000-db94-5950dddc0439] Processing failed org.apache.nifi.processor.exception.ProcessException: java.io.IOException: COMMIT event received while not processing a transaction (i.e. no corresponding BEGIN event). This could indicate that your binlog position is invalid. at Note: This issue get resolved when i just delete the existing CaptureChangeMySQL processor and again create and configure the CaptureChangeMySQL processor.
... View more
07-13-2022
09:41 AM
@araujo Can you please suggest how I can fix above issue. CaptureChangeMySql processor is giving value "{}" for BIT datatype. Processor out payload: { "type": "update", "timestamp": 1657194900000, "binlog_filename": "mysql-bin.000020", "binlog_position": 37029, "database": "nrpsubscriptiondb2306", "table_name": "user_subscription", "table_id": 137, "columns": [ { "id": 29, "name": "is_deleted", "column_type": -7, "last_value": "{}", "value": "{}" }, ] }
... View more