Community Articles

Find and share helpful community-sourced technical articles.
Labels (1)
avatar
Cloudera Employee

Hive-Kafka Integration steps

This procedure is well suited for a Kerberos and auto TLS-enabled cluster.

  1. To begin with please create client.properties files with entries as shown below in the bootstrap server(Active Controller Node check this from cm-->kafka--->instances-->xxxx.net.com(activecontroller):
security.protocol=SASL_SSL
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=kafka
ssl.truststore.location=/var/run/cloudera-scm-agent/process/71-kafka-KAFKA_CONNECT/cm-auto-global_truststore.jks
ssl.truststore.password=xxxxxxx
ssl.keystore.location=/var/run/cloudera-scm-agent/process/71-kafka-KAFKA_CONNECT/cm-auto-host_keystore.jks
ssl.keystore.password=xxxxxxx
sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useTicketCache=true;

2. Also it is important to export the Jaas.conf file from the respective path as given below.

export KAFKA_OPTS="-Djava.security.auth.login.config=/tmp/jaas.conf"


Topic creation in Kafka (please get a valid tkt if it is a Kerberos-enabled cluster)

  1. Use the below command to create a topic in Kafka using below command:
kafka-topics --create --bootstrap-server xxxx.xxxx:9093 --topic topicname --command-config /tmp/client.properties​
  • Post creating the topic we can list it using the below command:
/bin/kafka-topics --bootstrap-server xxxx.xxxx:9093 --list --command-config /tmp/client.properties.​
  • Produce a topic from Kafka using the below command:
kafka-console-producer --broker-list xxxx.xxxx:9093 --topic topicname --producer.config /tmp/client.properties​
  • Post issuing the above command you will get a prompt where we have to give input to your Kafka topic eg given below: 
input to above topic give the input accordingly to the need.
{"type": "next", "key1": "sachin", "key2": "ramu", "timestamp": "2024-01-01 00:00:00", "attr": "goodtime"}.​​
  • Consuming from a topic in Kafka use the below command:
kafka-console-consumer --topic topicname --bootstrap-server xxxx.xxxx:9093 --consumer.config /tmp/client.properties --from-beginning​
  • While accessing kafka topics from Hive it is necessary to provide the respective user valid permission to consume/produce access to the said topic. Ranger Admin ui has to be used to provide appropriate permissions for the user to access topics to avoid permission denied errors.
  • From CM-->Ranger-->Ranger Admin Web UI the below screen will pop-upsachinelango_1-1743496502934.png

    Login with the required credentials to get a User inerface as show below.

     

    sachinelango_0-1743499631022.png

    From the Service manager window above choose under the kafka folder icon link as cm_kafka link to get the below screen and edit all-topic from the Policy name tab

     

    sachinelango_1-1743499925371.png

    After clicking the edit button from the Action tab below screen will appear and provide the user(here in this example hive user is used) on the select users tab and permission tab will display the required permission to all topics or we can create specific permissions and users by creating our own policy against the kafka topic by choosing the Add New Policy tab in the above image.

     

    Note:- Please save the changes made from the window below to take effect.

    sachinelango_0-1743500629597.png

     

     

Hive Part

Before creating a hive-kafka table for security reasons or we can say for additional security we need to bind all truststore,keystore, and key passwords to a file in HDFS to use against the hive-kafka table that we are going to create in hive.

  1. Use the actual password of the keystore and trustore in the respective cluster environment and give the key password the same as the keystore password.
hadoop credential create hive.kafka.ssl.truststore.password -value xxxx -provider jceks://hdfs/tmp/test15.jceks
hadoop credential create hive.kafka.ssl.keystore.password -value xxxxx -provider jceks://hdfs/tmp/test15.jceks
hadoop credential create hive.kafka.ssl.key.password -value xxxxx -provider jceks://hdfs/tmp/test15.jceks
​
  • Post generating the above credentials please verify it with the help of the below command, you should happen to see the below three keys:
hadoop credential list -provider jceks://hdfs/tmp/test.jceks
Listing aliases for CredentialProvider: jceks://hdfs/tmp/test.jceks
hive.kafka.ssl.key.password
hive.kafka.ssl.keystore.password
hive.kafka.ssl.truststore.password​
  • Change the below setting from Cloudera manager from hive and hive_on_tez services.
CM-->hive search for credentials and disable the below just uncheck the box

Generate HADOOP_CREDSTORE_PASSWORD

Repeat the same for hive_on_tez service as well and restart the stale service post saving the changes.​
  • Hive table creation DDL example: 
CREATE EXTERNAL TABLE `kafka_hive_plain_ssltexteg`(
`type` string,
`key1` string,
`key2` string,
`timestamp` string,
`attr` string)
STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
TBLPROPERTIES (
"kafka.topic" = "kafkabrownbag",
"hive.kafka.ssl.key.password"="hive.kafka.ssl.key.password",
"hive.kafka.ssl.truststore.password" = "hive.kafka.ssl.truststore.password",
"hive.kafka.ssl.keystore.password" = "hive.kafka.ssl.keystore.password",
"kafka.bootstrap.servers"="node2.hive-kafkabbdemo-sachin.coelab.cloudera.com:9093",
"kafka.security.protocol"="SASL_SSL",
"kafka.producer.security.protocol"="SASL_SSL",
"kafka.consumer.security.protocol"="SASL_SSL",
"hive.kafka.ssl.credential.keystore"="jceks://hdfs@namenodehostfqdn:portnoofnamenode/tmp/test15.jceks",
"hive.kafka.ssl.truststore.location"="hdfs:///tmp/cm-auto-global_truststore.jks",
"hive.kafka.ssl.keystore.location" = "hdfs:///tmp/cm-auto-host_keystore.jks",
"kafka.sasl.kerberos.service.name"="kafka");

​
  • Important table properties, generated out of the Hadoop credential command provide these table properties:
"hive.kafka.ssl.key.password"="hive.kafka.ssl.key.password"
"hive.kafka.ssl.truststore.password" = "hive.kafka.ssl.truststore.password"
"hive.kafka.ssl.keystore.password" = "hive.kafka.ssl.keystore.password"

Bootstrapserver node of kafka
"kafka.bootstrap.servers"="xxxx.xxxx:9093"​
  • Update the security protocol along with consumer and producer protocol: 
"kafka.security.protocol"="SASL_SSL"
"kafka.producer.security.protocol"="SASL_SSL"
"kafka.consumer.security.protocol"="SASL_SSL"​
  • Path for jceks file in tbl property created out of the hadoop credential command.
"hive.kafka.ssl.credential.keystore"="jceks://hdfs@namenodehostfqdn:portnoofnamenode/tmp/test15.jceks"--- This path is for Non-HA cluster if its HA enabled Cluster give the Nameservices in the path instead of namenode url and port no eg :- "jceks://hdfs@Nameservicesofcluster/tmp/test15.jceks"
  • Path of trustore & keystore jks in hdfs:
"hive.kafka.ssl.truststore.location"="hdfs:///tmp/cm-auto-global_truststore.jks"
"hive.kafka.ssl.keystore.location" = "hdfs:///tmp/cm-auto-host_keystore.jks"

The following is the table property for Kerberos-enabled cluster for Kafka:

"kafka.sasl.kerberos.service.name"="kafka"

Some other important table properties which will be created by default are given below. Refer below given link below for more on table properties.

kafka.serde.class'='org.apache.hadoop.hive.serde2.JsonSerDe'
'kafka.topic'='kafkabrownbag',
'kafka.write.semantic'='AT_LEAST_ONCE',

Reference

 

 

 

203 Views
0 Kudos