Member since
02-25-2022
9
Posts
17
Kudos Received
0
Solutions
09-29-2022
08:02 AM
1 Kudo
Are you asking how to import the template into Nifi, or how to drop a template you've already loaded onto the canvas? It is two distinct actions.
... View more
09-29-2022
07:34 AM
3 Kudos
If you made it this far, you probably already know why you're here. Connecting Dbeaver to an Impala virtual warehouse isn't difficult, but there are a few gotchas that can make it frustrating. So let's conquer those obstacles. Provision an Impala Virtual Warehouse Log into a CDP instance Navigate to Data Warehouse Enable your Data Warehouse Environment & Database Catalog Create a New Virtual Warehouse Give your virtual warehouse a unique name Select Impala Select your database catalog from the dropdown SSO may be enabled or disabled, the choice is yours! Set Availability Zone and User Groups per your requirements or leave as-is Pick the size (aka decide how much money you want to spend) The remaining options are going to depend on your needs, but the defaults are fine for our purposes. Click Create. Expect approximately 5 minutes to create your virtual warehouse. Download the JDBC Driver From your virtual warehouse tile, click on the kebab icon in the upper right. You'll find all sorts of fun options under there, but we're primarily interested in the Download JDBC/ODBC Driver option, which will download the Impala jar to your local machine. You can leave it in your Downloads folder, or move it to wherever you like to store your jars. It will be named similar to this: impala_driver_jdbc_odbc.zip You'll need to unzip it, which will create a new folder named impala_driver_jdbc_odbc. Inside that folder will be two additional folders, we're interested in the JDBC folder, named something like this: ClouderaImpala_JDBC-2.6.23.1028 Within that folder will be the actual Impala drivers, named for JDBC versions 4.1 and 4.2. More info on these versions can be found here. You don't need to unzip these any further. ClouderaImpalaJDBC41-2.6.23.1028.zip
ClouderaImpalaJDBC42-2.6.23.1028.zip Copy the JDBC URL Again, from the kebab icon in your virtual warehouse tile, copy the JDBC URL. This URL has all the necessary information to make the connection, and should be of the form: jdbc:impala://coordinator-cnelson2-impala-vdw.dw-se-sandboxx-aws.a465-9q4k.cloudera.site:443/default;AuthMech=12;transportMode=http;httpPath=cliservice;ssl=1;auth=browser Create a New Impala Connection in DBeaver Next, we will create a new connection within DBeaver. Create a new connection in DBeaver, selecting Cloudera Impala as the database driver. Click Edit Driver Settings to tweak the URL template Remove the jdbc:impala:// prefix from the URL template Remove the :{port} from the URL template The new URL template should look like this: {host}/{database} Click OK For the Host, paste the JDBC URL you copied earlier. Leave the port empty Set the Database/Schema to the name of the database you want to connect in as (i.e. default) Username/Password: If the warehouse is SSO-enabled, use your SSO credentials. If the warehouse is not SSO-enabled, use your CDP workload credentials. Add the Impala driver Click on the Edit Driver Settings button DBeaver may have installed with a driver for Impala, but you may find it to not be fully compatible with your virtual warehouse. Open the Libraries tab and Delete the existing drivers to avoid any conflict. Click Add File to add the Impala driver (the 41 or 42 zip file) you downloaded earlier. Click OK Click Test Connection and verify that you can connect. If your virtual warehouse is SSO-enabled, DBeaver will open a browser tab to allow you to authenticate if you aren't already so authenticated. Click Finish to save the new connection. Once the connection is created, you can navigate the database, tables, columns, etc, as well as query your data. Congratulations, you did it (and there was much rejoicing). Tips If you have connectivity even after successfully testing your connection, doing an Invalidate/Reconnect or a full Disconnect + Reconnect to reset the connection. If a query seems to take a long time to run, check the status of your virtual warehouse, it is likely that it was stopped and needs to restart to execute the query. You may need your cloud firewall rules set to allow traffic on port 443 from your IP address. You might also be interested in my eerily similar article on connecting Dbeaver to a Hive Virtual Warehouse.
... View more
09-06-2022
10:16 AM
2 Kudos
Imagine a scenario where you're using CDP to host your Kafka environment, but you also are using Snowflake and need to get data from here to there with as little effort as possible. It turns out that CDP plays very nicely with 3rd party vendors such as Snowflake. Let's find out how through Streams Messaging Manager & KConnect. Prerequisites: Streams Messaging data hub Snowflake account (if you don't have one, you can set up a free trial which will work fine for this) 30 minutes of free time Installing the Snowflake Sink Connector The default setup for a Streams Messaging data hub in CDP includes many Kafka Connect sources & sinks, but obviously every connector can't be included. To use a 3rd party connector you need to download the jar(s) to each broker node in your Streams Messaging cluster. Where exactly to put the jar files depends on what sort of jar it is. If it is completely contained in one "fat" jar, then it can go into the plugin path defined in Cloudera Manager which is set to /var/lib/kafka by default. If there are dependencies on other jars, then all of the jars (including the fat jar, which obviously isn't quite as fat as you need it to be) need to be placed in a subfolder under the plugin path. Regardless of it being a fat jar or there being dependencies, the jars themselves all need 644 permissions, but the subfolder additionally needs execute rights so the subfolder needs 755 permissions. It requires root access to perform these actions, so it might make more sense to build a recipe to have these actions performed for you when the Streams Messaging data hub is created (i.e. if you are unable to sudo on the broker nodes). However if your cluster is already running, you'll need to perform these steps manually on each broker node. You may have to modify the version numbers to suit your needs. See the Cloudera documentation for more info on KConnect connectors. wget https://repo1.maven.org/maven2/com/snowflake/snowflake-kafka-connector/1.8.0/snowflake-kafka-connector-1.8.0.jar -P /var/lib/kafka/snowflake/
wget https://repo1.maven.org/maven2/org/bouncycastle/bc-fips/1.0.1/bc-fips-1.0.1.jar -P /var/lib/kafka/snowflake/
wget https://repo1.maven.org/maven2/org/bouncycastle/bcpkix-fips/1.0.3/bcpkix-fips-1.0.3.jar -P /var/lib/kafka/snowflake/
chmod 755 /var/lib/kafka/snowflake
chmod 644 /var/lib/kafka/snowflake/* Once the jars are in place on every broker node, you'll need to restart the Kafka Connect roles from within Cloudera Manager: Clusters --> Kafka, and then find the Kafka Connect roles under the Instances tab. Check all of them and click Restart. Once the roles have restarted, open up the Kafka Connect UI within Streams Messaging Manager, start a New Connection, and navigate to the Sink Templates tab. You should now see a SnowflakeSinkConnector, ready for you to sink data through. So let's put it to work. If you don't see the SnowflakeSinkConnector here, verify that the jars are on all your broker nodes and that the permissions are correct. And that Kafka has been restarted across the entire cluster. Preparing Snowflake to Receive Sink'd Data This is not intended to be a complete treatise on Snowflake operations, this serves as a simple sample case to allow data to land in a Snowflake table to demonstrate the Kafka Connect functionality. To allow remote connections as a user from something like Kafka Connect public & private keys must be created and attached to your Snowflake user. This is all detailed in the Snowflake docs (https://docs.snowflake.com/en/user-guide/key-pair-auth.html#configuring-key-pair-authentication), but we'll go through it here now very quickly and with much hand-waving. From your local machine, run these steps to create the public & private keys: openssl genrsa -out snowflake_key.pem 2048
openssl rsa -in snowflake_key.pem -pubout -out snowflake_key.pub If you need to instead use an encrypted private key, use these steps: openssl genrsa -out snowflake_key 4096
openssl rsa -in snowflake_key -pubout -out snowflake_key.pub
openssl pkcs8 -topk8 -inform pem -in snowflake_key -outform PEM -v2 aes-256-cbc -out snowflake_key.p8 It will ask you for a passphrase which you can set to whatever you like. You'll use this when you configure the connector. You'll need the public key to create your user within Snowflake, so cat the contents of snowflake_key.pub and copy everything between the "-----BEGIN PUBLIC KEY-----" and "-----END PUBLIC KEY-----". Use that public key when creating your user as shown below. Be sure to remove all the line feeds, so that your public key is a single line of text (it's ok if the Snowflake UI wraps the line). Next we'll set up some Snowflake objects to receive data from the sink connector. Run these commands in Snowflake to create the necessary objects & permissions, making sure to change the username to match the Snowflake user you created above. There are likely multiple ways to achieve this end goal in Snowflake, and all of this might not even be necessary. Consult your local Snowflake Expert for more information. use role accountadmin;
create database sink_db;
create role sink_role;
create schema sink_db.sink_schema;
create table sink_table (record_metadata variant, record_content variant);
create warehouse sink_warehouse with warehouse_size = 'XSMALL' warehouse_type = 'STANDARD' auto_suspend = 300 auto_resume = true;
grant usage on database sink_db to role sink_role;
grant usage on schema sink_db.sink_schema to role sink_role;
grant create table on schema sink_db.sink_schema to role sink_role;
grant create stage on schema sink_db.sink_schema to role sink_role;
grant create pipe on schema sink_db.sink_schema to role sink_role;
grant ownership on table sink_table to role sink_role;
grant usage on warehouse sink_warehouse to role sink_role;
grant select on sink_table to role public;
grant role sink_role to user cnelson;
alter user cnelson set default_role = sink_role; The net result is that you now should have an empty table with two variant columns, just waiting to have some Kafka data sunk into it. Configuring the Snowflake Sink Connector Configuring the Snowflake connector is fairly straightforward, but you may want to import this configuration boilerplate which will get you 85% of the way there. {
"connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",
"tasks.max": "4",
"topics": "sample.topic",
"snowflake.topic2table.map": "sample.topic:sink_table",
"buffer.count.records": "10000",
"buffer.flush.time": "60",
"buffer.size.bytes": "500000",
"snowflake.url.name": "",
"snowflake.user.name": "",
"snowflake.private.key": "",
"snowflake.database.name": "SINK_DB",
"snowflake.schema.name": "SINK_SCHEMA",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "com.snowflake.kafka.connector.records.SnowflakeJsonConverter"
} If you used an encrypted private key when you created your public/private keys you will also need to add this configuration item with the passphrase you used. You can either carefully add it to the json before you import, or use the (+) button to add the property after import. "snowflake.private.key.passphrase": "" That leaves three configuration items that need to be completed. snowflake.user.name: this is the name of the user you created earlier. snowflake.private.key: this will be the contents of the pem or p8 private key file you created earlier. Only the content between the BEGIN/END tags is needed. Just paste that whole hot mess into the field. snowflake.url.name: From Snowflake, change your role to ORGADMIN and run SHOW ORGANIZATION ACCOUNTS; This will return an account_url which is what you'll use here. Once the configuration is complete, click on the Validate button and hope for the best. Individual field errors will show inline, but jar related errors will be shown above the configuration section. If you see no classdef found type errors, check that you have the Snowflake connector on every broker node, in the correct path, with 644 permissions. If you get a bouncy castle related error, check that you have the two encryption related jars (and the snowflake fat jar) in a subfolder under /var/lib/kafka, and that the directory permissions are 755. And that you restarted the Kafka Connect roles after you made those changes. Once the configuration is valid, deploy the connector. And then watch nothing happen because we don't have any data flowing yet. Click Next, and then deploy your connector. Testing the Connector The easiest way (probably) to test the connector is by using the kafka-console-producer via ssh'ing into one of the broker nodes. And yes, I completely hand-waved over the complications of configuring the SSL properties for the console producer. For info on how to set that up, see this article (https://community.cloudera.com/t5/Community-Articles/Configuring-Kafka-Console-Producer-Consumer-on-a-Secured-CDP/ta-p/351341) The console producer only requires minimal information to produce messages, primarily consisting of the brokers and the topic to which you want to publish. The producer.config file handles the ssl configuration and is detailed here. The topic we're using is the same topic we listed in the connect configuration parameter topics and also the snowflake.topic2table.map. kafka-console-producer --broker-list cnelson2-streams-snowflake-corebroker0.se-sandb.a465-9q4k.cloudera.site:9093,cnelson2-streams-snowflake-corebroker1.se-sandb.a465-9q4k.cloudera.site:9093,cnelson2-streams-snowflake-corebroker2.se-sandb.a465-9q4k.cloudera.site:9093 \
--topic sample.topic \
--producer.config kafka-ssl.config Once it gets rolling, it quietly waits for user input to publish to the topic, which our KConnect Snowflake sink connector will then consume those messages and send them to our Snowflake table. Let's see it in action. Messages are produced.... And eventually you can see them in Snowflake. Note that the records will show up in your Snowflake table in "near real time," which in the real world means within a minute or so, but probably depends on a number of factors. From here, it's up to you to monetize your new skills.
... View more
09-01-2022
09:40 AM
2 Kudos
The Kafka console producer & consumer client utilities are very simple to use in an unsecured environment, such as with many legacy CDH clusters or a standalone install. However CDP-Public Cloud spins up secured by default, which means that we must get our feet wet with Kerberos if we want to use the Kafka client tools. Kerberos isn't the easiest topic, but thankfully we won't need to get too deep into it. This isn't meant to be a Kerberos primer; we'll cover just enough to achieve the minimal goal of producing and consuming basic messages. Pre-requisites A CDP environment A Streams Messaging data hub root access to the data hub hosts Building a Kafka Configuration The Kafka client utilities require a number of ssl related properties in order to talk to Kafka. We'll put the properties into a configuration file named kafka-ssl.config for ease of use, which will look something like this boilerplate: security.protocol = SASL_SSL
sasl.mechanism=GSSAPI
sasl.kerberos.service.name = kafka
ssl.truststore.type = jks
ssl.truststore.location = ""
ssl.truststore.password = ""
sasl.jaas.config = Fortunately these values can all be found on the server if we know where to look, although we need to gain root access to find most of them. truststore location: ls -lart /run/cloudera-scm-agent/process/*kafka-KAFKA_BROKER/cm-auto-global_truststore.jks Note the wildcard in the path. Every Streams Messaging data hub will have a slightly different path here so we need the wildcard to find it. truststore password: cat /var/run/cloudera-scm-agent/process/*kafka-KAFKA_BROKER/proc.json | grep KAFKA_BROKER_TRUSTORE_PASSWORD Note the spelling of "TRUSTORE" here. (It must be Italian.) keytab location: ls -lart /run/cloudera-scm-agent/process/*kafka-KAFKA_BROKER/kafka.keytab jass.config: cat /run/cloudera-scm-agent/process/*kafka-KAFKA_BROKER/jaas.conf The jass.config is actually a collection of configurations referencing various login modules, but we will use the Krb5LoginModule section under KafkaServer. You can copy & paste that section into the sasl.jaas.config section of our Kafka configuration. KafkaServer {
com.sun.security.auth.module.Krb5LoginModule required
doNotPrompt=true
useKeyTab=true
storeKey=true
keyTab="/var/run/cloudera-scm-agent/process/1546335853-kafka-KAFKA_BROKER/kafka.keytab"
principal="kafka/cnelson2-streams-corebroker0.se-sandb.a465-9q4k.cloudera.site@SE-SANDB.A465-9Q4K.CLOUDERA.SITE";
org.apache.kafka.common.security.scram.ScramLoginModule required
;
org.apache.kafka.common.security.plain.PlainLoginModule required
ldap_url="ldaps://ldap.se-sandb.a465-9q4k.cloudera.site:636"
user_dn_template="uid={0},cn=users,cn=accounts,dc=se-sandb,dc=a465-9q4k,dc=cloudera,dc=site";
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/var/run/cloudera-scm-agent/process/1546335853-kafka-KAFKA_BROKER/kafka.keytab"
principal="kafka/cnelson2-streams-corebroker0.se-sandb.a465-9q4k.cloudera.site@SE-SANDB.A465-9Q4K.CLOUDERA.SITE";
}; Using all the pieces we just found, your completed configuration should look something like this (your specific truststore password, Kafka broker paths, and Kerberos principal will be unique to your data hub), which I wrote to a file called kafka-ssl.config. Remember to use continuation slashes and don't miss the trailing semicolon after the principal when pasting the jass.config. security.protocol = SASL_SSL
sasl.mechanism=GSSAPI
sasl.kerberos.service.name = kafka
ssl.truststore.type = jks
ssl.truststore.location = /run/cloudera-scm-agent/process/1546335853-kafka-KAFKA_BROKER/cm-auto-global_truststore.jks
ssl.truststore.password = 456sbar15i29vaicvcv3s6f9o4
sasl.jaas.config = \
com.sun.security.auth.module.Krb5LoginModule required \
doNotPrompt=true \
useKeyTab=true \
storeKey=true \
keyTab="/var/run/cloudera-scm-agent/process/1546335853-kafka-KAFKA_BROKER/kafka.keytab" \
principal="kafka/cnelson2-streams-corebroker0.se-sandb.a465-9q4k.cloudera.site@SE-SANDB.A465-9Q4K.CLOUDERA.SITE"; Verify Kerberos Ticket We should first check if we have a valid Kerberos ticket by issuing a klist: If no valid ticket is returned, we'll have to generate one. The first step is to identify our principal by reading the keytab via ktutil using the keytab & principal we found earlier. The astute reader will recognize that the principal is also found in the jaas.config, but it's handy to know how to use ktuil, so I'll demonstrate that just because. It will list several principals, but we're looking for the one for kafka. Once we have identified the principal, we can use kinit to create a Kerberos ticket. kinit -kt PATH_TO_KEYTAB FULL_PRINCIPAL Now that we have a valid ticket, we can think about actually using the Kafka client utilities. Actually Produce & Consume Messages kafka-console-producer & kafka-console-consumer are tools that are installed on the broker nodes of your Streams Messaging cluster. They require minimal information to get the basics working, primarily consisting of the FQDN of the brokers (remember SSL uses port 9093), and the topic to which you want to produce. The consumer additionally needs a consumer group. If we run a producer in one window and a consumer in another window you can verify both are working in real time. Of course there are many other command line options you can supply, but these are the bare minimum to get it flowing. In one terminal window run this to begin producing: kafka-console-producer --broker-list cnelson2-streams-corebroker0.se-sandb.a465-9q4k.cloudera.site:9093,cnelson2-streams-corebroker1.se-sandb.a465-9q4k.cloudera.site:9093,cnelson2-streams-corebroker2.se-sandb.a465-9q4k.cloudera.site:9093 \
--topic test.topic \
--producer.config kafka-ssl.config And in another terminal window run this to begin consuming: kafka-console-consumer --bootstrap-server cnelson2-streams-corebroker0.se-sandb.a465-9q4k.cloudera.site:9093,cnelson2-streams-corebroker1.se-sandb.a465-9q4k.cloudera.site:9093,cnelson2-streams-corebroker2.se-sandb.a465-9q4k.cloudera.site:9093 \
--topic test.topic \
--consumer.config kafka-ssl.config \
--group myConsumerGroup \
--from-beginning Once the producer is up and running, you can simply type messages into the console and you will see them show up in the consumer window. producing & consuming, with a dose of truth. And there you have it.
... View more
08-16-2022
07:59 AM
2 Kudos
Oftentimes data hubs will need additional steps performed in order to support your specific workload or use case. For me this usually is something like adding a jar to enable database connectivity on a Data Flow or Streams Messaging hub. Recently I was working with Kafka Connect on a Streams Messaging data hub for MySQL change data capture. This required uploading a particular jar file to each node in the cluster, placing it in a specific folder, renaming it, and changing the permissions. The tasks themselves weren't difficult, but by the time I had done it on the 3rd broker node I was ready for a better solution. By the end of this article we will have that better way implemented. Enter CDP Recipes. Recipes are essentially bash or python scripts that execute while the cluster is spinning up. They are analogous to "user data" when you spin up an EC2 instance, which can run commands to make initializing an instance easier so it's ready when you first sign in. Recipes run as root, so there isn't much limit to what you can do in a recipe. You can also specify at what point in the cluster creation process you want your recipe to execute, including prior to termination in case there are cleanup steps you need to perform before the cluster snuffs it. after cluster install before Cloudera Manager startup after Cloudera Manager startup before cluster termination Official CDP Recipe Documentation: https://docs.cloudera.com/data-hub/cloud/recipes/topics/mc-creating-custom-scripts-recipes.html So let's create a recipe and put it to use. Step 0: Decide what your recipe needs to do My motivation was to make the steps necessary for getting a Streams Messaging data hub ready for CDC capture from MySQL. That entire process is detailed in this article , but it amounts to uploading a file and changing the permissions. As a shell script, it would look something like this. Download the file, move it to the right folder, rename it, and change the permissions. Remember, recipes run as root so you don't need to include sudo instructions. #!/bin/bash
wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.29/mysql-connector-java-8.0.29.jar -P /var/lib/kafka_connect_jdbc/
mv /var/lib/kafka_connect_jdbc/mysql-connector-java-8.0.29.jar /var/lib/kafka_connect_jdbc/mysql-connector-java.jar
chmod 644 /var/lib/kafka_connect_jdbc/mysql-connector-java.jar Step 1: Create a Recipe From the CDP Management Console, navigate to Recipes, under Shared Resources. Then click Register Recipe. Next, enter the name of your recipe in the first box, which contains the text "Enter Environment Name". Don't ask me why it says 'Environment' rather than 'Recipe,' I don't have a good answer. Maybe when you read this it will have changed to something more intuitive. Then select the type of recipe, which is really just a way of defining when your recipe will run. Details about each type can be found in the documentation. This particular set of actions needs Kafka services to be restarted to take effect, so I've chosen to have my recipe run prior to Cloudera Manager starting, because that will be prior to Kafka having been started. A description is optional, but probably a best practice as you collect more recipes or if you expect other people on your team to use them. Lastly, either upload your script or paste it in as plain text, and then click Register. Step 2: Create a new Data Hub using your Recipe My recipe is designed to work with a Streams Messaging data hub, so we'll have to create one. But there is a new step that we must take before we click the Provision Cluster button. Under Advanced Options (#1 in the diagram)... Click on the Cluster Extensions (#2 in the diagram) Our recipe needs to run on the broker nodes, so let's attach the recipe to brokers and core brokers. In the box labeled "Please select a recipe" (#3 in the diagram) you can start typing the name of your recipe until you find it. Once you have your recipe selected, click Attach (#4 in the diagram). It will then become a chip under the Attached Recipes section for that node type (#5 in the diagram). Do this for all the node types you upon which want your recipe applied. Now provision your cluster Step 3: Verify the Results To validate that your recipe did what you needed it to do, the most direct method is to SSH into each node and check if the files are located where you expect them with the proper permissions, since that's the crux of what this recipe did. Here you can see that the jar file is in the /var/lib/kafka_connect_jdbc folder, has been renamed to mysql-connector-java.jar, and has had the permissions changed to 644....which is exactly what the recipe was supposed to have done. The Thrilling Conclusion Without much effort at all you should now be able to create & utilize a recipe when spinning up a data hub. I've shared how I used a recipe, let me know how you are using recipes by commenting below. I'm sure there is some really cool stuff you can do with recipes, don't be afraid to share what you've been able to do.
... View more
07-27-2022
08:20 AM
Seems like it's not using the schema name field when building the insert statement. Are you specifying the database to use in your DBCP connection pool service? Although user/users are reserved words in mysql, so I would expect problems with that name. Possibly creating the table wrapping the name in back ticks might work.
... View more
07-15-2022
09:01 AM
4 Kudos
Pulling Change Data Capture (aka CDC) logs off a database can be challenging, but is made easier with tools like Debezium and Kafka Connect. It can still be challenging, but Cloudera's Streams Messaging Manager has an interface to help simplify the setup process. At its core, Kafka Connect simply takes data sources (think: relational database, message queue, syslogs, etc.) and publishes that data to a kafka topic. It can also read data from a kafka topic and sink that data to another location (think: S3, HTTP endpoint, database, etc). It connects sources to sinks using kafka as the intermediate layer. For our purposes, we will set up a connection to read CDC logs off a MySQL instance and publish that data to kafka. Pre-requisites: CDP Public Cloud environment A Streams Messaging data hub (light duty is fine) A MySQL instance that you can administer & restart Helpful documents: Installing MySQL on Linux Configuring MySQL We'll need to do some basic setup on our MySQL instance which is most easily done by via ssh. Once logged into the MySQL host, connect to the MySQL instance: mysql -uroot -p We'll need to create a few users and a database that will be used to connect into later. You'll need to grab the hostnames of the broker nodes on your Streams Messaging data hub in order to create the user correctly. SSH into your MySQL host and connect to the MySQL instance: mysql -uroot -p We need to create a user for each broker node in our data hub. Feel free to use a more secure password if that's your thing: create user cdc_user@'cnelson2-streams-corebroker0.se-sandb.a465-9q4k.cloudera.site' identified with mysql_native_password by 'Password1#';
create user cdc_user@'cnelson2-streams-corebroker1.se-sandb.a465-9q4k.cloudera.site' identified with mysql_native_password by 'Password1#';
create user cdc_user@'cnelson2-streams-corebroker2.se-sandb.a465-9q4k.cloudera.site' identified with mysql_native_password by 'Password1#'; Next let's create a database and a table to demonstrate all this magic in action: create database cdc_test;
use cdc_test;
create table t (
id bigint(20) auto_increment,
ts timestamp default current_timestamp on update current_timestamp,
primary key (id)); The final MySQL setup step we need to perform is to enable binary logging, which is done by adding these lines to /etc/my.cnf under the [mysqld] section. Any other settings in that file should not be touched. [mysqld]
server_id = 1
log_bin = delta
binlog_format = row
binlog_row_image = FULL server_id is somewhat arbitrary, and log_bin is the prefix for the actual binlog files that get written to disk but can be really whatever you like. binlog_format and binlog_row_image control the CDC payload. MySQL must be restarted in order for those changes to take effect: systemctl restart mysqld You'll want to make sure your MySQL instance allows inbound traffic on port 3306 from each of the nodes in your Streams Messaging data hub. MySQL jar Setup Kafka Connect requires you to upload the jar file to enable MySQL connectivity. You must copy the jar file to each of the nodes in the Streams Messaging data hub. Kafka Connect is expecting the jar to be in the /var/lib/kafka_connect_jdbc folder, and expects it to be named mysql-connector-java.jar, and requires 644 permissions. You'll need superuser privileges to copy the file into this location. You may find it easier to create a recipe to automate these tasks at datahub spin-up. sudo -i
cd /var/lib/kafka_connect_jdbc
wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.29/mysql-connector-java-8.0.29.jar
mv mysql-connector-java-8.0.29.jar mysql-connector-java.jar
chmod 644 mysql-connect-java.jar How did we know it needed to be in that location with that exact name? It can be found in the documentation (https://docs.cloudera.com/runtime/7.2.15/kafka-connect/topics/kafka-connect-connector-debezium-mysql.html), but it can also be found within Cloudera Manager for your Streams Messaging data hub, under the Kafka Configuration. Once the jars are in place, you will need to restart Kafka services within Cloudera Manager. Configuring Kafka Connect From your Streams Messaging data hub, open up the Streams Messaging Manager UI, and navigate to Connect on the left hand navigation bar. Then click the button to create a New Connector. And select the MySqlConnector (#2 pictured below) as a Source Template (#1 pictured below). This will open up a configuration pane with a series of key/value pairs for the connector configuration. The default configuration is probably enough for a cluster without any security or kerberos in place, but CDP Public Cloud is fully secured so we will need to add quite a bit to this configuration. You can set these config parameters manually in this interface, or you can upload an existing configuration which is what we'll do here. Click the Import Connector Configuration (#3 pictured above). and paste the below JSON into the window, making the necessary changes to reflect your cluster & MySQL details. {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"name": "CDC-Test",
"database.history.kafka.topic": "schema-changes.inventory-mysql",
"database.history.kafka.bootstrap.servers": "${cm-agent:ENV:KAFKA_BOOTSTRAP_SERVERS}",
"database.history.producer.bootstrap.servers": "${cm-agent:ENV:KAFKA_BOOTSTRAP_SERVERS}",
"database.history.consumer.bootstrap.servers": "${cm-agent:ENV:KAFKA_BOOTSTRAP_SERVERS}",
"database.history.producer.ssl.truststore.password": "${cm-agent:ENV:CONNECT_SSL_SERVER_TRUSTSTORE_PASSWORD}",
"database.history.consumer.ssl.truststore.password": "${cm-agent:ENV:CONNECT_SSL_SERVER_TRUSTSTORE_PASSWORD}",
"database.history.producer.ssl.truststore.type": "jks",
"database.history.consumer.ssl.truststore.type": "jks",
"database.history.producer.ssl.truststore.location": "${cm-agent:ENV:CONF_DIR}/cm-auto-global_truststore.jks",
"database.history.consumer.ssl.truststore.location": "${cm-agent:ENV:CONF_DIR}/cm-auto-global_truststore.jks",
"database.history.producer.security.protocol": "SASL_SSL",
"database.history.consumer.security.protocol": "SASL_SSL",
"database.history.producer.sasl.kerberos.service.name": "kafka",
"database.history.consumer.sasl.kerberos.service.name": "kafka",
"database.history.producer.sasl.jaas.config": "com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"${cm-agent:ENV:CONF_DIR}/kafka.keytab\" principal=\"kafka/${cm-agent:ENV:HOST}@SE-SANDB.A465-9Q4K.CLOUDERA.SITE\";",
"database.history.consumer.sasl.jaas.config": "com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"${cm-agent:ENV:CONF_DIR}/kafka.keytab\" principal=\"kafka/${cm-agent:ENV:HOST}@SE-SANDB.A465-9Q4K.CLOUDERA.SITE\";",
"database.allowPublicKeyRetrieval": "true",
"database.user": "cdc_user",
"database.password": "Password1#",
"database.include.list": "cdc_test",
"database.server.id": "184054",
"database.server.name": "cdc_test",
"database.hostname": "18.219.70.232",
"database.port": "3306",
"tasks.max": "1"
} There are literally a zillion parameters you can supply when creating the connector depending on how sophisticated you want your flow to be. What we have here is a "basic" configuration for a kerberized kafka cluster, but Debezium is much more powerful than that. It is left as an exercise to the reader to explore some of the wilder features, like inline field transformations. Many of the database configuration settings are self-explanatory, but further information can be found in the Debezium MySQL Connector documentation: https://debezium.io/documentation/reference/1.8/connectors/mysql.html#mysql-required-connector-configuration-properties database.user & database.password is the user you created earlier database.include.list is the database you want to track changes on database.server.id is really an arbitrary value that is a unique ID for the connector database.server.name is the name that will get attached to the kafka topics that get created database.hostname & database.port is the address & port of your MySQL instance Security Setup within Kafka Connect Documentation is helpful, but knowing how to set the security-related parameters is something of a dark art. Here is how you can find the appropriate settings for your cluster. It will require privileged access to read these locations, so make sure you can gain root access. jaas.config Inspect the contents of the jaas.conf file to find the correct jaas.config settings. cat /run/cloudera-scm-agent/process/*-kafka-KAFKA_BROKER/jaas.conf Note the wildcard in the path. Each cluster & each node within the cluster will have a different value prefix in that file path. The contents of the file has several sections, but our interest is with the Client section near the bottom of the file. Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/var/run/cloudera-scm-agent/process/1546337495-kafka-KAFKA_BROKER/kafka.keytab"
principal="kafka/cnelson2-streams-corebroker0.se-sandb.a465-9q4k.cloudera.site@SE-SANDB.A465-9Q4K.CLOUDERA.SITE";
}; Everything inside those braces is what we'll use for our jaas.config. You'll note again the path which in my case includes 1546337495-kafka-KAFKA_BROKER and is specific to each node. We're only able to supply one keytab here, so we need a way to accommodate for the fact that the path is slightly different on each node. In a production deployment you would likely put your keytabs in a "less volatile" location, but for our purposes we will leverage an environment variable for this path: ${cm-agent:ENV:CONF_DIR}. Note that the principal also references an individual node of my cluster. We can replace that with another environment variable named ${cm-agent:ENV:HOST}. truststore, keystore, security.protocol, & kerberos.service.name These values can all be found in the same location, and combined with grep you can get the individual values you'll need. Several of these will make use of similar environment variable swaps for the paths: cat /run/cloudera-scm-agent/process/*-kafka-KAFKA_CONNECT/connect-distributed.properties | grep ssl.truststore.location cat /run/cloudera-scm-agent/process/*-kafka-KAFKA_CONNECT/connect-distributed.properties | grep ssl.keystore.location cat /run/cloudera-scm-agent/process/*-kafka-KAFKA_CONNECT/connect-distributed.properties | grep security.protocol cat /run/cloudera-scm-agent/process/*-kafka-KAFKA_CONNECT/connect-distributed.properties | grep sasl.kerberos.service.name cat /run/cloudera-scm-agent/process/*-kafka-KAFKA_CONNECT/connect-distributed.properties | grep ssl.truststore.type cat /run/cloudera-scm-agent/process/*-kafka-KAFKA_CONNECT/connect-distributed.properties | grep ssl.truststore.password truststore.password The truststore.password requires a different environment variable called ${cm-agent:ENV:CONNECT_SSL_SERVER_TRUSTSTORE_PASSWORD}. Again, in a production deployment you might not handle your keystore & truststore this way, but it works well for evaluation purposes. bootstrap.servers Debezium is going to write the CDC logs & other CDC metadata to kafka topics, so we need to specify our kafka brokers. The Streams Messaging data hub includes kafka, and Connect has an environment variable ${cm-agent:ENV:KAFKA_BOOTSTRAP_SERVERS} that points back to those broker addresses. Validate the Configuration Once the configuration is complete, you must first validate the configuration before you can deploy it. Validating will ensure the necessary jars are in place, security parameters are correct, and check connectivity to the remote database. If the validation process finds any issues, they will be presented either in the configuration box, or possibly as a red alert box in the upper right corner of your browser window. Once you have a validated configuration, click Next, and then click Deploy. See it in Action! From the MySQL console, insert some data into the table we created earlier. mysql> insert into t (id) values (100);
Query OK, 1 row affected (0.00 sec)
mysql> select * from t;
+-----+---------------------+
| id | ts |
+-----+---------------------+
| 100 | 2022-07-15 15:28:53 |
+-----+---------------------+
1 row in set (0.00 sec)
mysql> If everything is working, we should see a corresponding message in a kafka topic named based on the source database name, the database.server.name property, and the table name. So in our case the topic will be named cdc_test.cdc_test.t, and we see it in Streams Messaging Manager: And using the data explorer in Streams Messaging Manager, we can inspect the actual CDC payload. It is a JSON object with a lot of useful information, but of particular interest to us is the payload key which includes a before & after view of the row. Since we inserted a new row, the before version is null and the after version has the data we inserted. Closing Thoughts Once you have a steady stream of CDC log data, you're off to the races. A common pattern would be to use nifi to consume the topic and apply transformations, but you could instead use a Kafka Connect sink processor to read the topic and send it to a new destination. You've already done the hard part, which is to get a stream of database changes. Now go do something cool with it!
... View more
05-16-2022
11:25 AM
3 Kudos
We all love Hue. However, it isn't always the perfect tool for doing a lot of querying, so you've probably looked toward some 3rd party tools for database connectivity. There are many such tools, but DBeaver has a very capable free version and is among the most popular. Connecting it to a Hive virtual warehouse isn't difficult, but there are a few gotchas that can make it frustrating. So let's conquer those obstacles.
Here is a video walkthrough of the process.
Provision a Hive Virtual Warehouse
Log into a CDP instance
Navigate to Data Warehouse
Enable your Data Warehouse Environment & Database Catalog
Create a New Virtual Warehouse
Give your virtual warehouse a unique name
Select Hive
Select your database catalog from the dropdown SSO may be enabled or disabled, the choice is yours!
Set Availability Zone and User Groups per your requirements or leave as-is
Pick the size (aka decide how much money you want to spend) The remaining options are going to depend on your needs, but the defaults are fine for our purposes.
Click Create. Expect approximately 5 minutes to create your virtual warehouse.
Download the JDBC Jar
From your virtual warehouse tile, click on the kebab icon in the upper right. You'll find all sorts of fun options under there, but we're primarily interested in the Download JDBC Jar option, which will download the Hive jar to your local machine. You can leave it in your Downloads folder, or move it to wherever you like to store your jars. It will be named similar to this:
hive-jdbc-3.1.0-SNAPSHOT-standalone.jar
Copy the JDBC URL
Again, from the kebab icon in your virtual warehouse tile, copy the JDBC URL. This URL has all the necessary information to make the connection, and should be of the form:
jdbc:hive2://hs2-<virtual warehouse name>.<cdp environment name>.a465-9q4k.cloudera.site/default;transportMode=http;httpPath=cliservice;socketTimeout=60;ssl=true;retries=3;
Create a New Hive Connection in DBeaver
Next, we will create a new connection within DBeaver.
Create a new connection in DBeaver, selecting Apache Hive as the database driver.
For the Host, paste the JDBC URL you copied earlier. You'll need to remove the jdbc:hive2:// prefix, since the DBeaver JDBC URL template automatically adds that in for you.
Leave the port empty
Set the Database/Schema to the name of the database you want to connect in as (i.e. default)
Username/Password:
If the warehouse is SSO-enabled, use your SSO credentials.
If the warehouse is not SSO-enabled, use your CDP workload credentials.
Add the Hive jar
Click on the Edit Driver Settings button
DBeaver installs with a driver for Hive, but you may find it to not be fully compatible with your virtual warehouse. Open the Libraries tab and Delete the existing drivers to avoid any conflict.
Click Add File to add the Hive jar you downloaded earlier.
Click OK
Click Test Connection and verify that you can connect. If your virtual warehouse is SSO-enabled, DBeaver will open a browser tab to allow you to authenticate if you aren't already so authenticated.
Click Finish to save the new connection.
Once the connection is created, you can navigate the database, tables, columns, etc, as well as query your data. Congratulations, you did it (and there was much rejoicing).
Tips
If you have connectivity even after successfully testing your connection, doing an Invalidate/Reconnect or a full Disconnect + Reconnect to reset the connection.
If a query seems to take a long time to run, check the status of your virtual warehouse, it is likely that it was stopped and needs to restart to execute the query.
You may need your cloud firewall rules set to allow traffic on port 443 from your IP address.
Credit to @Royles (Chris Royles) for his similar article about connecting DBeaver to Phoenix/OpDB.
... View more
05-06-2022
09:32 AM
Great article! With the introduction of the OpDB experience (aka COD), there are some worthwhile updates to make here. Namely, there is a direct download link to the Phoenix Thin client, and also the Phoenix Thin URL is provided for you (no need to determine the knox host). With that comes a complication on the DBeaver side. Easiest thing to do is to blank out the URL Template, which allows you to directly paste the Thin URL provided by COD, making sure to update your workload password. Alternatively, you can parameterize the URL & secure the password by changing the URL template to look like this, along with unchecking the "No Authentication" box. jdbc:phoenix:thin:url={host};avatica_user={user};avatica_password={password} The complication is that you would need to only copy a portion of the URL up to the user section to get the host.
... View more