Created on 09-06-2022 10:16 AM - edited 09-06-2022 10:50 AM
Imagine a scenario where you're using CDP to host your Kafka environment, but you also are using Snowflake and need to get data from here to there with as little effort as possible. It turns out that CDP plays very nicely with 3rd party vendors such as Snowflake. Let's find out how through Streams Messaging Manager & KConnect.
Prerequisites:
Installing the Snowflake Sink Connector
The default setup for a Streams Messaging data hub in CDP includes many Kafka Connect sources & sinks, but obviously every connector can't be included. To use a 3rd party connector you need to download the jar(s) to each broker node in your Streams Messaging cluster. Where exactly to put the jar files depends on what sort of jar it is. If it is completely contained in one "fat" jar, then it can go into the plugin path defined in Cloudera Manager which is set to /var/lib/kafka by default.
If there are dependencies on other jars, then all of the jars (including the fat jar, which obviously isn't quite as fat as you need it to be) need to be placed in a subfolder under the plugin path. Regardless of it being a fat jar or there being dependencies, the jars themselves all need 644 permissions, but the subfolder additionally needs execute rights so the subfolder needs 755 permissions. It requires root access to perform these actions, so it might make more sense to build a recipe to have these actions performed for you when the Streams Messaging data hub is created (i.e. if you are unable to sudo on the broker nodes). However if your cluster is already running, you'll need to perform these steps manually on each broker node. You may have to modify the version numbers to suit your needs. See the Cloudera documentation for more info on KConnect connectors.
wget https://repo1.maven.org/maven2/com/snowflake/snowflake-kafka-connector/1.8.0/snowflake-kafka-connector-1.8.0.jar -P /var/lib/kafka/snowflake/
wget https://repo1.maven.org/maven2/org/bouncycastle/bc-fips/1.0.1/bc-fips-1.0.1.jar -P /var/lib/kafka/snowflake/
wget https://repo1.maven.org/maven2/org/bouncycastle/bcpkix-fips/1.0.3/bcpkix-fips-1.0.3.jar -P /var/lib/kafka/snowflake/
chmod 755 /var/lib/kafka/snowflake
chmod 644 /var/lib/kafka/snowflake/*
Once the jars are in place on every broker node, you'll need to restart the Kafka Connect roles from within Cloudera Manager: Clusters --> Kafka, and then find the Kafka Connect roles under the Instances tab. Check all of them and click Restart.
Once the roles have restarted, open up the Kafka Connect UI within Streams Messaging Manager, start a New Connection, and navigate to the Sink Templates tab. You should now see a SnowflakeSinkConnector, ready for you to sink data through. So let's put it to work.
If you don't see the SnowflakeSinkConnector here, verify that the jars are on all your broker nodes and that the permissions are correct. And that Kafka has been restarted across the entire cluster.
Preparing Snowflake to Receive Sink'd Data
This is not intended to be a complete treatise on Snowflake operations, this serves as a simple sample case to allow data to land in a Snowflake table to demonstrate the Kafka Connect functionality. To allow remote connections as a user from something like Kafka Connect public & private keys must be created and attached to your Snowflake user. This is all detailed in the Snowflake docs (https://docs.snowflake.com/en/user-guide/key-pair-auth.html#configuring-key-pair-authentication), but we'll go through it here now very quickly and with much hand-waving.
From your local machine, run these steps to create the public & private keys:
openssl genrsa -out snowflake_key.pem 2048
openssl rsa -in snowflake_key.pem -pubout -out snowflake_key.pub
If you need to instead use an encrypted private key, use these steps:
openssl genrsa -out snowflake_key 4096
openssl rsa -in snowflake_key -pubout -out snowflake_key.pub
openssl pkcs8 -topk8 -inform pem -in snowflake_key -outform PEM -v2 aes-256-cbc -out snowflake_key.p8
It will ask you for a passphrase which you can set to whatever you like. You'll use this when you configure the connector.
You'll need the public key to create your user within Snowflake, so cat the contents of snowflake_key.pub and copy everything between the "-----BEGIN PUBLIC KEY-----" and "-----END PUBLIC KEY-----". Use that public key when creating your user as shown below. Be sure to remove all the line feeds, so that your public key is a single line of text (it's ok if the Snowflake UI wraps the line).
Next we'll set up some Snowflake objects to receive data from the sink connector. Run these commands in Snowflake to create the necessary objects & permissions, making sure to change the username to match the Snowflake user you created above. There are likely multiple ways to achieve this end goal in Snowflake, and all of this might not even be necessary. Consult your local Snowflake Expert for more information.
use role accountadmin;
create database sink_db;
create role sink_role;
create schema sink_db.sink_schema;
create table sink_table (record_metadata variant, record_content variant);
create warehouse sink_warehouse with warehouse_size = 'XSMALL' warehouse_type = 'STANDARD' auto_suspend = 300 auto_resume = true;
grant usage on database sink_db to role sink_role;
grant usage on schema sink_db.sink_schema to role sink_role;
grant create table on schema sink_db.sink_schema to role sink_role;
grant create stage on schema sink_db.sink_schema to role sink_role;
grant create pipe on schema sink_db.sink_schema to role sink_role;
grant ownership on table sink_table to role sink_role;
grant usage on warehouse sink_warehouse to role sink_role;
grant select on sink_table to role public;
grant role sink_role to user cnelson;
alter user cnelson set default_role = sink_role;
The net result is that you now should have an empty table with two variant columns, just waiting to have some Kafka data sunk into it.
Configuring the Snowflake Sink Connector
Configuring the Snowflake connector is fairly straightforward, but you may want to import this configuration boilerplate which will get you 85% of the way there.
{
"connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",
"tasks.max": "4",
"topics": "sample.topic",
"snowflake.topic2table.map": "sample.topic:sink_table",
"buffer.count.records": "10000",
"buffer.flush.time": "60",
"buffer.size.bytes": "500000",
"snowflake.url.name": "",
"snowflake.user.name": "",
"snowflake.private.key": "",
"snowflake.database.name": "SINK_DB",
"snowflake.schema.name": "SINK_SCHEMA",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "com.snowflake.kafka.connector.records.SnowflakeJsonConverter"
}
If you used an encrypted private key when you created your public/private keys you will also need to add this configuration item with the passphrase you used. You can either carefully add it to the json before you import, or use the (+) button to add the property after import.
"snowflake.private.key.passphrase": ""
That leaves three configuration items that need to be completed.
Once the configuration is complete, click on the Validate button and hope for the best. Individual field errors will show inline, but jar related errors will be shown above the configuration section. If you see no classdef found type errors, check that you have the Snowflake connector on every broker node, in the correct path, with 644 permissions. If you get a bouncy castle related error, check that you have the two encryption related jars (and the snowflake fat jar) in a subfolder under /var/lib/kafka, and that the directory permissions are 755. And that you restarted the Kafka Connect roles after you made those changes. Once the configuration is valid, deploy the connector. And then watch nothing happen because we don't have any data flowing yet.
Click Next, and then deploy your connector.
Testing the Connector
The easiest way (probably) to test the connector is by using the kafka-console-producer via ssh'ing into one of the broker nodes. And yes, I completely hand-waved over the complications of configuring the SSL properties for the console producer. For info on how to set that up, see this article (https://community.cloudera.com/t5/Community-Articles/Configuring-Kafka-Console-Producer-Consumer-on-...)
The console producer only requires minimal information to produce messages, primarily consisting of the brokers and the topic to which you want to publish. The producer.config file handles the ssl configuration and is detailed here. The topic we're using is the same topic we listed in the connect configuration parameter topics and also the snowflake.topic2table.map.
kafka-console-producer --broker-list cnelson2-streams-snowflake-corebroker0.se-sandb.a465-9q4k.cloudera.site:9093,cnelson2-streams-snowflake-corebroker1.se-sandb.a465-9q4k.cloudera.site:9093,cnelson2-streams-snowflake-corebroker2.se-sandb.a465-9q4k.cloudera.site:9093 \
--topic sample.topic \
--producer.config kafka-ssl.config
Once it gets rolling, it quietly waits for user input to publish to the topic, which our KConnect Snowflake sink connector will then consume those messages and send them to our Snowflake table. Let's see it in action.
Messages are produced....
And eventually you can see them in Snowflake. Note that the records will show up in your Snowflake table in "near real time," which in the real world means within a minute or so, but probably depends on a number of factors.
From here, it's up to you to monetize your new skills.