Created on 03-26-2025 10:01 PM - edited 04-01-2025 03:04 AM
Hive-Kafka Integration steps
This procedure is well suited for a Kerberos and auto TLS-enabled cluster.
security.protocol=SASL_SSL
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=kafka
ssl.truststore.location=/var/run/cloudera-scm-agent/process/71-kafka-KAFKA_CONNECT/cm-auto-global_truststore.jks
ssl.truststore.password=xxxxxxx
ssl.keystore.location=/var/run/cloudera-scm-agent/process/71-kafka-KAFKA_CONNECT/cm-auto-host_keystore.jks
ssl.keystore.password=xxxxxxx
sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useTicketCache=true;
2. Also it is important to export the Jaas.conf file from the respective path as given below.
export KAFKA_OPTS="-Djava.security.auth.login.config=/tmp/jaas.conf"
Topic creation in Kafka (please get a valid tkt if it is a Kerberos-enabled cluster)
kafka-topics --create --bootstrap-server xxxx.xxxx:9093 --topic topicname --command-config /tmp/client.properties
/bin/kafka-topics --bootstrap-server xxxx.xxxx:9093 --list --command-config /tmp/client.properties.
kafka-console-producer --broker-list xxxx.xxxx:9093 --topic topicname --producer.config /tmp/client.properties
input to above topic give the input accordingly to the need.
{"type": "next", "key1": "sachin", "key2": "ramu", "timestamp": "2024-01-01 00:00:00", "attr": "goodtime"}.
kafka-console-consumer --topic topicname --bootstrap-server xxxx.xxxx:9093 --consumer.config /tmp/client.properties --from-beginning
Login with the required credentials to get a User inerface as show below.
From the Service manager window above choose under the kafka folder icon link as cm_kafka link to get the below screen and edit all-topic from the Policy name tab
After clicking the edit button from the Action tab below screen will appear and provide the user(here in this example hive user is used) on the select users tab and permission tab will display the required permission to all topics or we can create specific permissions and users by creating our own policy against the kafka topic by choosing the Add New Policy tab in the above image.
Note:- Please save the changes made from the window below to take effect.
Hive Part
Before creating a hive-kafka table for security reasons or we can say for additional security we need to bind all truststore,keystore, and key passwords to a file in HDFS to use against the hive-kafka table that we are going to create in hive.
hadoop credential create hive.kafka.ssl.truststore.password -value xxxx -provider jceks://hdfs/tmp/test15.jceks
hadoop credential create hive.kafka.ssl.keystore.password -value xxxxx -provider jceks://hdfs/tmp/test15.jceks
hadoop credential create hive.kafka.ssl.key.password -value xxxxx -provider jceks://hdfs/tmp/test15.jceks
hadoop credential list -provider jceks://hdfs/tmp/test.jceks
Listing aliases for CredentialProvider: jceks://hdfs/tmp/test.jceks
hive.kafka.ssl.key.password
hive.kafka.ssl.keystore.password
hive.kafka.ssl.truststore.password
CM-->hive search for credentials and disable the below just uncheck the box
Generate HADOOP_CREDSTORE_PASSWORD
Repeat the same for hive_on_tez service as well and restart the stale service post saving the changes.
CREATE EXTERNAL TABLE `kafka_hive_plain_ssltexteg`(
`type` string,
`key1` string,
`key2` string,
`timestamp` string,
`attr` string)
STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
TBLPROPERTIES (
"kafka.topic" = "kafkabrownbag",
"hive.kafka.ssl.key.password"="hive.kafka.ssl.key.password",
"hive.kafka.ssl.truststore.password" = "hive.kafka.ssl.truststore.password",
"hive.kafka.ssl.keystore.password" = "hive.kafka.ssl.keystore.password",
"kafka.bootstrap.servers"="node2.hive-kafkabbdemo-sachin.coelab.cloudera.com:9093",
"kafka.security.protocol"="SASL_SSL",
"kafka.producer.security.protocol"="SASL_SSL",
"kafka.consumer.security.protocol"="SASL_SSL",
"hive.kafka.ssl.credential.keystore"="jceks://hdfs@namenodehostfqdn:portnoofnamenode/tmp/test15.jceks",
"hive.kafka.ssl.truststore.location"="hdfs:///tmp/cm-auto-global_truststore.jks",
"hive.kafka.ssl.keystore.location" = "hdfs:///tmp/cm-auto-host_keystore.jks",
"kafka.sasl.kerberos.service.name"="kafka");
"hive.kafka.ssl.key.password"="hive.kafka.ssl.key.password"
"hive.kafka.ssl.truststore.password" = "hive.kafka.ssl.truststore.password"
"hive.kafka.ssl.keystore.password" = "hive.kafka.ssl.keystore.password"
Bootstrapserver node of kafka
"kafka.bootstrap.servers"="xxxx.xxxx:9093"
"kafka.security.protocol"="SASL_SSL"
"kafka.producer.security.protocol"="SASL_SSL"
"kafka.consumer.security.protocol"="SASL_SSL"
"hive.kafka.ssl.credential.keystore"="jceks://hdfs@namenodehostfqdn:portnoofnamenode/tmp/test15.jceks"--- This path is for Non-HA cluster if its HA enabled Cluster give the Nameservices in the path instead of namenode url and port no eg :- "jceks://hdfs@Nameservicesofcluster/tmp/test15.jceks"
"hive.kafka.ssl.truststore.location"="hdfs:///tmp/cm-auto-global_truststore.jks"
"hive.kafka.ssl.keystore.location" = "hdfs:///tmp/cm-auto-host_keystore.jks"
The following is the table property for Kerberos-enabled cluster for Kafka:
"kafka.sasl.kerberos.service.name"="kafka"
Some other important table properties which will be created by default are given below. Refer below given link below for more on table properties.
kafka.serde.class'='org.apache.hadoop.hive.serde2.JsonSerDe'
'kafka.topic'='kafkabrownbag',
'kafka.write.semantic'='AT_LEAST_ONCE',