Community Articles

Find and share helpful community-sourced technical articles.
Announcements
Celebrating as our community reaches 100,000 members! Thank you!
Labels (2)
avatar
  • Create Kafka topic
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper `hostname`:2181 --replication-factor 1 --partitions 1 --topic kafka_hive_topic
  • Create Hive table. (update the Kafka broker hostname below)
CREATE EXTERNAL TABLE kafka_hive_table
  (`Country Name` string , `Language` string,  `_id` struct<`$oid`:string>)
  STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
  TBLPROPERTIES
  ("kafka.topic" = "kafka_hive_topic", "kafka.bootstrap.servers"="c2114-node2.labs.com:6667");
  • Download the sample json data.
wget -O countries.json https://github.com/ozlerhakan/mongodb-json-files/blob/master/datasets/countries.json?raw=true
  • Produce data into Kafka topic.
cat countries.json | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list c2114-node2.-labs.com:6667 --topic kafka_hive_topic
  • Describe table (to see additional Kafka specific columns)
describe  kafka_hive_table;
+---------------+----------------------+--------------------+
|   col_name    |      data_type       |      comment       |
+---------------+----------------------+--------------------+
| country name  | string               | from deserializer  |
| language      | string               | from deserializer  |
| _id           | struct<$oid:string>  | from deserializer  |
| __key         | binary               | from deserializer  |
| __partition   | int                  | from deserializer  |
| __offset      | bigint               | from deserializer  |
| __timestamp   | bigint               | from deserializer  |
+---------------+----------------------+--------------------+
  • Run some sample queries.
SELECT count(*) from kafka_hive_table;
+--------+
|  _c0   |
+--------+
| 21640  |
+--------+

SELECT `__partition`, max(`__offset`), CURRENT_TIMESTAMP FROM kafka_hive_table GROUP BY `__partition`, CURRENT_TIMESTAMP;
+--------------+--------+--------------------------+
| __partition  |  _c1   |           _c2            |
+--------------+--------+--------------------------+
| 0            | 21639  | 2019-02-07 08:49:50.918  |
+--------------+--------+--------------------------+

select * from kafka_hive_table limit 10;

+--------------------------------+----------------------------+--------------------------------------+-------------------------+-------------------------------+----------------------------+-------------------------------+
| kafka_hive_table.country name  | kafka_hive_table.language  |         kafka_hive_table._id         | kafka_hive_table.__key  | kafka_hive_table.__partition  | kafka_hive_table.__offset  | kafka_hive_table.__timestamp  |
+--------------------------------+----------------------------+--------------------------------------+-------------------------+-------------------------------+----------------------------+-------------------------------+
| Afrika                         | af                         | {"$oid":"55a0f1d420a4d760b5fbdbd6"}  | NULL                    | 0                             | 0                          | 1549529251002                 |
| Oseanië                        | af                         | {"$oid":"55a0f1d420a4d760b5fbdbd7"}  | NULL                    | 0                             | 1                          | 1549529251010                 |
| Suid-Amerika                   | af                         | {"$oid":"55a0f1d420a4d760b5fbdbd8"}  | NULL                    | 0                             | 2                          | 1549529251010                 |
| Wêreld                         | af                         | {"$oid":"55a0f1d420a4d760b5fbdbd9"}  | NULL                    | 0                             | 3                          | 1549529251011                 |
| አፍሪካ                           | am                         | {"$oid":"55a0f1d420a4d760b5fbdbda"}  | NULL                    | 0                             | 4                          | 1549529251011                 |
| ኦሽኒያ                           | am                         | {"$oid":"55a0f1d420a4d760b5fbdbdb"}  | NULL                    | 0                             | 5                          | 1549529251011                 |
| ዓለም                            | am                         | {"$oid":"55a0f1d420a4d760b5fbdbdc"}  | NULL                    | 0                             | 6                          | 1549529251011                 |
| ደቡባዊ አሜሪካ                      | am                         | {"$oid":"55a0f1d420a4d760b5fbdbdd"}  | NULL                    | 0                             | 7                          | 1549529251011                 |
| أمريكا الجنوبية                | ar                         | {"$oid":"55a0f1d420a4d760b5fbdbde"}  | NULL                    | 0                             | 8                          | 1549529251011                 |
| أمريكا الشمالية                | ar                         | {"$oid":"55a0f1d420a4d760b5fbdbdf"}  | NULL                    | 0                             | 9                          | 1549529251011                 |
+--------------------------------+----------------------------+--------------------------------------+-------------------------+-------------------------------+----------------------------+-------------------------------+
3,176 Views
0 Kudos
Comments
avatar
Explorer

Hello , 

I have question , i have cloudera cluster how to configure this kafka-handler within the cluster in order to integrate with hiv e , spark , and the other services in the cluster? is there special configuration from cloudera manager ??