Support Questions

Find answers, ask questions, and share your expertise

CaptureChangeMySQL not working in NiFi 1.2

avatar
Explorer

I had downloaded the latest version of NiFi 1.2 and tried testing new processor for MySQL CDC CaptureChangeMySQL. All the configuration has been made from mysql side and in NiFi CaptureChangeMySQL processor however I was not able to get the change data as output of SUCCESS. There were no errors or warnings in the log files as well.

When I set Include Begin/Commit Events property to TRUE, I got few flow files but that was about events details as {"type":"commit","timestamp":1495114883000,"binlog_filename":"mysql-bin.000001","binlog_position":8155,"database":"test_cdc"} and not the actual changed data.

I am not sure what configuration is missing so that CaptureChangeMySQL processor gives me correct output.

Let me know if anyone has successfully tested this CaptureChangeMySQL processor in NiFi 1.2.

Thanks in advance.

8 REPLIES 8

avatar
Master Guru

Have you performed any INSERT, UPDATE, DELETE events since you enabled binary logging? You probably don't need to include Begin/Commit events unless you are doing auditing or your target DB needs them.

In general, should you ever want to "reset" the CDC processor to get all the binlog records, set Retrieve All Records to true and clear the state of the processor (i.e. right-click on the stopped processor, choose View State, then Clear State).

avatar
Explorer

Thanks Matt for quick reply.

I have tried INSERT, UPDATE, DELETE events but it didn't worked. You are correct as we don't need auditing I will set it to FALSE but just for testing I enabled it so that at least I got some flow files as output 🙂

Also I tried resetting Retrieve All Records and clearing state but even that won't help.

I even downloaded the source code of the CaptureChangeMySQL processor and tried to run the unit test cases but unit test class is in Groovy, I was not able to compile and test it. I am not sure why the unit test class in Groovy when everything else is written in Java language.

avatar
Guru

Hi @Vidyadhar Salunkhe

I'm not sure why you are not getting those INSERT, UPDATE/DELETE events to show up in your queue, especially if they exist in your bin log. But, I can confirm that I have the CaptureChangeMySQL processor working for those events in my NiFi 1.2 instance.

With logging turned on in MySQL, I created a table and added one row to it. With Begin/Commit Events set to TRUE in the processor config, the processor generated 4 FlowFiles as expected:

  • 1 for ddl event for the table creation
  • 1 begin event
  • 1 for the insert event for the row creation
  • 1 commit event

15589-cdc-processor-success.png

Looking at the contents of the queue, this is the detail for the FlowFile for the Insert event:

15590-insert-event-flowfile.png

Can you double-check that the bin log that the processor is referencing (the binlog.filename displayed in View State) contains your insert/delete/update events?

15591-view-state-cdc.png

avatar
New Contributor

I'm having trouble with CaptureChangeMySQL on NIFI when I try with AWS Aurora (MySQL). In local MySQL database installation I had permissions to add log_bin = 1 and binlog_format=row, log_bin = test1 binlog_do_db = source server_id = 1 into my.cnf. In AWS Aurora, I can put binlog_format=row and log_bin=1 option on parameters, but still getting error when flow is executed. Attaching log:

2017-10-26 16:55:03,011 ERROR [Timer-Driven Process Thread-8] o.a.n.c.m.processors.CaptureChangeMySQL CaptureChangeMySQL[id=c36acae4-6dd9-3c4c-e433-4da30518edee] Binlog connector communications failure: Could not find first log file name in binary log index file: com.github.shyiko.mysql.binlog.network.ServerException: Could not find first log file name in binary log index file

com.github.shyiko.mysql.binlog.network.ServerException: Could not find first log file name in binary log index file

at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:882)

at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:559)

at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:793)

at java.lang.Thread.run(Thread.java:748)

Anyone had same behavior?

avatar
Explorer

Thanks Alim for reply...

I could see the entries in the bin log file however no events got generated 😞

Request you to share the configuration for MySQL and NiFi CaptureChangeMySQL processor.

Also share the output of events by viewing...I think it would be in Jason format.

Thanks in advance.

avatar
Guru

For MySQL, in the my.cnf config file I added the following lines:

log_bin = test1

binlog_do_db = source

server_id = 1

Thus, any bin log files generated would begin with "test1" and events would only be generated for the "source" database.

Here are the configuration details for the processor:

15593-cdc-processor-configuration.png

Here is the output for each of the 4 events:

{ "type" : "ddl", "timestamp" : 1495207919000, "binlog_filename" : "test1.000001", "binlog_position" : 219, "database" : "source", "table_name" : null, "table_id" : null, "query" : "create table users (id mediumint(9) not null auto_increment primary key, first text, last text)" }

{ "type" : "begin", "timestamp" : 1495207933000, "binlog_filename" : "test1.000001", "binlog_position" : 457, "database" : "source" }

{ "type" : "insert", "timestamp" : 1495207933000, "binlog_filename" : "test1.000001", "binlog_position" : 585, "database" : "source", "table_name" : "users", "table_id" : 109, "columns" : [ { "id" : 1, "name" : "id", "column_type" : 4, "value" : 1 }, { "id" : 2, "name" : "first", "column_type" : -1, "value" : "Andrew" }, { "id" : 3, "name" : "last", "column_type" : -1, "value" : "Lim" } ] }

{ "type" : "commit", "timestamp" : 1495207933000, "binlog_filename" : "test1.000001", "binlog_position" : 637, "database" : "source" }

avatar
Master Guru

I believe you'll also need the following in your my.cnf:

binlog_format=row

avatar
Explorer

Thanks a lot Matt. The last change binlog_format=row was required & missed in my case.

Now it is working fine. I could able to see the evets in the flow file.

I have one more query, in "column_type" : 4, is value 4 is data type? What are standard data types and are these specific to databases or universally common? Is it linked to java.sql.Types?

Let me know.