Created on 07-15-2022 09:01 AM - edited 10-21-2022 08:40 AM
Pulling Change Data Capture (aka CDC) logs off a database can be challenging, but is made easier with tools like Debezium and Kafka Connect. It can still be challenging, but Cloudera's Streams Messaging Manager has an interface to help simplify the setup process. At its core, Kafka Connect simply takes data sources (think: relational database, message queue, syslogs, etc.) and publishes that data to a kafka topic. It can also read data from a kafka topic and sink that data to another location (think: S3, HTTP endpoint, database, etc). It connects sources to sinks using kafka as the intermediate layer. For our purposes, we will set up a connection to read CDC logs off a MySQL instance and publish that data to kafka.
Pre-requisites:
Helpful documents:
Configuring MySQL
We'll need to do some basic setup on our MySQL instance which is most easily done by via ssh. Once logged into the MySQL host, connect to the MySQL instance:
mysql -uroot -p
We'll need to create a few users and a database that will be used to connect into later. You'll need to grab the hostnames of the broker nodes on your Streams Messaging data hub in order to create the user correctly.
SSH into your MySQL host and connect to the MySQL instance:
mysql -uroot -p
We need to create a user for each broker node in our data hub. Feel free to use a more secure password if that's your thing:
create user cdc_user@'cnelson2-streams-corebroker0.se-sandb.a465-9q4k.cloudera.site' identified with mysql_native_password by 'Password1#';
create user cdc_user@'cnelson2-streams-corebroker1.se-sandb.a465-9q4k.cloudera.site' identified with mysql_native_password by 'Password1#';
create user cdc_user@'cnelson2-streams-corebroker2.se-sandb.a465-9q4k.cloudera.site' identified with mysql_native_password by 'Password1#';
Next let's create a database and a table to demonstrate all this magic in action:
create database cdc_test;
use cdc_test;
create table t (
id bigint(20) auto_increment,
ts timestamp default current_timestamp on update current_timestamp,
primary key (id));
The final MySQL setup step we need to perform is to enable binary logging, which is done by adding these lines to /etc/my.cnf under the [mysqld] section. Any other settings in that file should not be touched.
[mysqld]
server_id = 1
log_bin = delta
binlog_format = row
binlog_row_image = FULL
server_id is somewhat arbitrary, and log_bin is the prefix for the actual binlog files that get written to disk but can be really whatever you like. binlog_format and binlog_row_image control the CDC payload.
MySQL must be restarted in order for those changes to take effect:
systemctl restart mysqld
You'll want to make sure your MySQL instance allows inbound traffic on port 3306 from each of the nodes in your Streams Messaging data hub.
MySQL jar Setup
Kafka Connect requires you to upload the jar file to enable MySQL connectivity. You must copy the jar file to each of the nodes in the Streams Messaging data hub. Kafka Connect is expecting the jar to be in the /var/lib/kafka_connect_jdbc folder, and expects it to be named mysql-connector-java.jar, and requires 644 permissions. You'll need superuser privileges to copy the file into this location. You may find it easier to create a recipe to automate these tasks at datahub spin-up.
sudo -i
cd /var/lib/kafka_connect_jdbc
wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.29/mysql-connector-java-8.0.29.jar
mv mysql-connector-java-8.0.29.jar mysql-connector-java.jar
chmod 644 mysql-connect-java.jar
How did we know it needed to be in that location with that exact name? It can be found in the documentation (https://docs.cloudera.com/runtime/7.2.15/kafka-connect/topics/kafka-connect-connector-debezium-mysql...), but it can also be found within Cloudera Manager for your Streams Messaging data hub, under the Kafka Configuration.
Once the jars are in place, you will need to restart Kafka services within Cloudera Manager.
Configuring Kafka Connect
From your Streams Messaging data hub, open up the Streams Messaging Manager UI, and navigate to Connect on the left hand navigation bar.
Then click the button to create a New Connector.
And select the MySqlConnector (#2 pictured below) as a Source Template (#1 pictured below). This will open up a configuration pane with a series of key/value pairs for the connector configuration. The default configuration is probably enough for a cluster without any security or kerberos in place, but CDP Public Cloud is fully secured so we will need to add quite a bit to this configuration. You can set these config parameters manually in this interface, or you can upload an existing configuration which is what we'll do here.
Click the Import Connector Configuration (#3 pictured above). and paste the below JSON into the window, making the necessary changes to reflect your cluster & MySQL details.
{
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"name": "CDC-Test",
"database.history.kafka.topic": "schema-changes.inventory-mysql",
"database.history.kafka.bootstrap.servers": "${cm-agent:ENV:KAFKA_BOOTSTRAP_SERVERS}",
"database.history.producer.bootstrap.servers": "${cm-agent:ENV:KAFKA_BOOTSTRAP_SERVERS}",
"database.history.consumer.bootstrap.servers": "${cm-agent:ENV:KAFKA_BOOTSTRAP_SERVERS}",
"database.history.producer.ssl.truststore.password": "${cm-agent:ENV:CONNECT_SSL_SERVER_TRUSTSTORE_PASSWORD}",
"database.history.consumer.ssl.truststore.password": "${cm-agent:ENV:CONNECT_SSL_SERVER_TRUSTSTORE_PASSWORD}",
"database.history.producer.ssl.truststore.type": "jks",
"database.history.consumer.ssl.truststore.type": "jks",
"database.history.producer.ssl.truststore.location": "${cm-agent:ENV:CONF_DIR}/cm-auto-global_truststore.jks",
"database.history.consumer.ssl.truststore.location": "${cm-agent:ENV:CONF_DIR}/cm-auto-global_truststore.jks",
"database.history.producer.security.protocol": "SASL_SSL",
"database.history.consumer.security.protocol": "SASL_SSL",
"database.history.producer.sasl.kerberos.service.name": "kafka",
"database.history.consumer.sasl.kerberos.service.name": "kafka",
"database.history.producer.sasl.jaas.config": "com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"${cm-agent:ENV:CONF_DIR}/kafka.keytab\" principal=\"kafka/${cm-agent:ENV:HOST}@SE-SANDB.A465-9Q4K.CLOUDERA.SITE\";",
"database.history.consumer.sasl.jaas.config": "com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"${cm-agent:ENV:CONF_DIR}/kafka.keytab\" principal=\"kafka/${cm-agent:ENV:HOST}@SE-SANDB.A465-9Q4K.CLOUDERA.SITE\";",
"database.allowPublicKeyRetrieval": "true",
"database.user": "cdc_user",
"database.password": "Password1#",
"database.include.list": "cdc_test",
"database.server.id": "184054",
"database.server.name": "cdc_test",
"database.hostname": "18.219.70.232",
"database.port": "3306",
"tasks.max": "1"
}
There are literally a zillion parameters you can supply when creating the connector depending on how sophisticated you want your flow to be. What we have here is a "basic" configuration for a kerberized kafka cluster, but Debezium is much more powerful than that. It is left as an exercise to the reader to explore some of the wilder features, like inline field transformations.
Many of the database configuration settings are self-explanatory, but further information can be found in the Debezium MySQL Connector documentation: https://debezium.io/documentation/reference/1.8/connectors/mysql.html#mysql-required-connector-confi...
Security Setup within Kafka Connect
Documentation is helpful, but knowing how to set the security-related parameters is something of a dark art. Here is how you can find the appropriate settings for your cluster. It will require privileged access to read these locations, so make sure you can gain root access.
jaas.config
Inspect the contents of the jaas.conf file to find the correct jaas.config settings.
cat /run/cloudera-scm-agent/process/*-kafka-KAFKA_BROKER/jaas.conf
Note the wildcard in the path. Each cluster & each node within the cluster will have a different value prefix in that file path. The contents of the file has several sections, but our interest is with the Client section near the bottom of the file.
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/var/run/cloudera-scm-agent/process/1546337495-kafka-KAFKA_BROKER/kafka.keytab"
principal="kafka/cnelson2-streams-corebroker0.se-sandb.a465-9q4k.cloudera.site@SE-SANDB.A465-9Q4K.CLOUDERA.SITE";
};
Everything inside those braces is what we'll use for our jaas.config. You'll note again the path which in my case includes 1546337495-kafka-KAFKA_BROKER and is specific to each node. We're only able to supply one keytab here, so we need a way to accommodate for the fact that the path is slightly different on each node. In a production deployment you would likely put your keytabs in a "less volatile" location, but for our purposes we will leverage an environment variable for this path: ${cm-agent:ENV:CONF_DIR}. Note that the principal also references an individual node of my cluster. We can replace that with another environment variable named ${cm-agent:ENV:HOST}.
truststore, keystore, security.protocol, & kerberos.service.name
These values can all be found in the same location, and combined with grep you can get the individual values you'll need. Several of these will make use of similar environment variable swaps for the paths:
cat /run/cloudera-scm-agent/process/*-kafka-KAFKA_CONNECT/connect-distributed.properties | grep ssl.truststore.location
cat /run/cloudera-scm-agent/process/*-kafka-KAFKA_CONNECT/connect-distributed.properties | grep ssl.keystore.location
cat /run/cloudera-scm-agent/process/*-kafka-KAFKA_CONNECT/connect-distributed.properties | grep security.protocol
cat /run/cloudera-scm-agent/process/*-kafka-KAFKA_CONNECT/connect-distributed.properties | grep sasl.kerberos.service.name
cat /run/cloudera-scm-agent/process/*-kafka-KAFKA_CONNECT/connect-distributed.properties | grep ssl.truststore.type
cat /run/cloudera-scm-agent/process/*-kafka-KAFKA_CONNECT/connect-distributed.properties | grep ssl.truststore.password
truststore.password
The truststore.password requires a different environment variable called ${cm-agent:ENV:CONNECT_SSL_SERVER_TRUSTSTORE_PASSWORD}. Again, in a production deployment you might not handle your keystore & truststore this way, but it works well for evaluation purposes.
bootstrap.servers
Debezium is going to write the CDC logs & other CDC metadata to kafka topics, so we need to specify our kafka brokers. The Streams Messaging data hub includes kafka, and Connect has an environment variable ${cm-agent:ENV:KAFKA_BOOTSTRAP_SERVERS} that points back to those broker addresses.
Validate the Configuration
Once the configuration is complete, you must first validate the configuration before you can deploy it. Validating will ensure the necessary jars are in place, security parameters are correct, and check connectivity to the remote database.
If the validation process finds any issues, they will be presented either in the configuration box, or possibly as a red alert box in the upper right corner of your browser window. Once you have a validated configuration, click Next, and then click Deploy.
See it in Action!
From the MySQL console, insert some data into the table we created earlier.
mysql> insert into t (id) values (100);
Query OK, 1 row affected (0.00 sec)
mysql> select * from t;
+-----+---------------------+
| id | ts |
+-----+---------------------+
| 100 | 2022-07-15 15:28:53 |
+-----+---------------------+
1 row in set (0.00 sec)
mysql>
If everything is working, we should see a corresponding message in a kafka topic named based on the source database name, the database.server.name property, and the table name. So in our case the topic will be named cdc_test.cdc_test.t, and we see it in Streams Messaging Manager:
And using the data explorer in Streams Messaging Manager, we can inspect the actual CDC payload. It is a JSON object with a lot of useful information, but of particular interest to us is the payload key which includes a before & after view of the row. Since we inserted a new row, the before version is null and the after version has the data we inserted.
Closing Thoughts
Once you have a steady stream of CDC log data, you're off to the races. A common pattern would be to use nifi to consume the topic and apply transformations, but you could instead use a Kafka Connect sink processor to read the topic and send it to a new destination. You've already done the hard part, which is to get a stream of database changes. Now go do something cool with it!
Created on 07-20-2022 05:37 AM
@cnelson2 This is legit! Excellent work my friend!!