Community Articles
Find and share helpful community-sourced technical articles.
Cloudera Employee

Introduction

In this article, I'll show how to stream data into CDP Public Cloud using Cloudera Dataflow/Streaming Datahub and query the data using Cloudera Data Warehouse.

Pre-Requisites

For this exercise you'll need Cloudera Data Platform with:

  • Cloudera Data Warehouse;
  • Datahub Flow Management;
  • Datahub Streams Messaging;

This exercise and flow are based on the sensor data/pipeline in edge2ai-workshop, but it is in a modified version that I'll share in another repository.

1. Create the Streaming Table in Cloudera Data Warehouse

For this exercise we need two virtual warehouses:

  • A Hive virtual warehouse: Used only to perform the compaction process (Streaming data ingest). For this example, I've named it latam-hive;
  • A Unified Analytics Virtual Warehouse: Used to query data and visualization. Unified Analytics is a very exciting new feature for CDW customers at no extra fee!. You can learn more hereFor this example, I've named it latam-vw;
carrossoni_0-1633191232702.png

Figure 1: Virtual Warehouses used on this exercise.

 

Now I can access the Hue interface in latam-hive VW to create the table used to store the sensor data that we'll be streaming:

create database streaming;


CREATE TABLE `streaming`.`sensors`(

  `sensor_ts` timestamp, 

  `sensor_id` double, 

  `sensor_0` double, 

  `sensor_1` double, 

  `sensor_2` double, 

  `sensor_3` double, 

  `sensor_4` double, 

  `sensor_5` double, 

  `sensor_6` double, 

  `sensor_7` double, 

  `sensor_8` double, 

  `sensor_9` double, 

  `sensor_10` double, 

  `sensor_11` double)

CLUSTERED BY (sensor_ts) INTO 32 BUCKETS;

 

2. Getting the SDX configuration and Copy to NiFi nodes

Before configuring the flow in NiFi, we'll need to upload some configuration files in the NiFi nodes.

2.1.1. In CDP Console, go to Environment -> <YourEnvironment> and then click on "Data Lake";

2.1.2. In the data lake name in the right menu, click on "View Client Configuration URLs"

carrossoni_9-1633191232714.png

2.1.3. Download the "Hive Metastore" configuration; this will be a zip file containing the files, unzip the file;

2.1.4. Copy (via scp for example) the files core-site.xml, hdfs-site.xml, and hive-site.xml to /tmp folder of each NiFi node in your Datahub environment. Since I've only one node for this example, I will just need to do this once (ex: scp hdfs-site.xml hive-site.xml core-site.xml <cdpworkloadusername>@<publicnifinodeip>:/tmp) and make all files readable in each node (ex: chmod a=r hdfs-site.xml);

3. Configuring the NiFi Streaming Flow

Now we'll use the NiFi Streaming Flow to simulate the sensor data and send via streaming data to the Hive Metastore located in the SDX platform in CDP.

 

First access NiFi in the Data Flow Datahub:

carrossoni_1-1633191232699.png

Figure 2: Open NiFi in Data Hub Cluster

 

Now we'll upload the NiFi template located on Github. This template is based on the Edg2AI workshop, but there's a change to create the data randomly directly in the flow, not using MiNiFi. You can get the flow template here.

 

In the NiFi canvas on the top menu

carrossoni_2-1633191232659.png

Select "Process Group" and drag and drop to the empty canvas. A new menu will appear. Select browse

carrossoni_3-1633191232666.png

to upload the template that you've just downloaded, and then click in ADD.

carrossoni_4-1633191232689.png

Figure 3: Process Group Streaming Created

 

After this, you can double-click in the Streaming Process Group and see that there are more two Process Groups:

1. IoT Data Generator: Used to simulate sensor data, random errors, and put in a Kafka topic.

2. Kafka to Hive: Used to consume the Kafka topic in the first Process Group and send the data via streaming to the table that we've created.

3.1 - Configuring IoT Data Generator Group

Double click in the "IoT Data Generator" group and we'll need to update some configuration to make it work:

3.1.1. In the Operate menu inside the Process Group

 

carrossoni_5-1633191232667.png

click in the engine to configure the "Controller Services":

  • First, click on the lightning button in "JsonRecordSetWriter" and "JsonTreeReader" controllers and enable both controllers;
  • There'll be two controllers called "Default NiFi SSL Context Service", but one is on an "Invalid" state. Click in the right on the "Arrow" icon and then click on "Remove" button to remove this invalid service;
  • At the end you should have this: carrossoni_6-1633191232668.png

3.1.2 Now, close this screen, and in the IoT Data Generator group, double-click on the "PublishKafkaRecord_2" Processor and update the following configuration in the Properties tab:

  • "Kafka Brokers": Change the value to the DNS of your Kafka DNS/port where the data will be sent. Example: messaging-broker:9093, if you're using a Streams Messaging Data Hub, this can be easily located in Streams Messaging Manager;
  • "Kerberos Principal": The principal of your user, you can obtain it via SSH in a NiFi node using your CDP User/password and perform a kinit/klist. More information is available here.
  • "Username": Your workload username;
  • "Password": Your workload password;
  • "SSL Context Service": Select "Default NiFi SSL Context Service" in the drop-down menu;
  • Apply the changes and this will close the configuration;

At the end, your flow may look like this:

carrossoni_7-1633191232709.png

Figure 4: IoT Generator flow

 

Now go back to the initial group "Streaming" using the bottom left menu

carrossoni_8-1633191232700.png

and now, we can configure the next Processor Group to consume the messages and send via streaming to our table.

3.2. Configuring Kafka to Hive Group

Double-click in the "Kafka to Hive" group and we'll need to update some configuration to make it work:
3.2.1. In the Operate menu inside the Process Group

 

carrossoni_10-1633191232690.png

click in the engine to configure the 'Controller Services':

  • Click in the lightning button in "JsonRecordSetWriter" and "JsonTreeReader" controllers and Enable both controllers;
  • In the end, you should have this: carrossoni_11-1633191232670.png

3.2.2. Now close this screen and still in the IoT Data Generator group, double-click on the "ConsumeKafka_2_0" Processor and update the following configuration in the Properties tab:

  • "Kafka Brokers": Change the value to the DNS of your Kafka DNS/port where the data will be sent. Example: messaging-broker:9093,  if you're using a Streams Messaging Data Hub this can be easily located in Streams Messaging Manager;
  • "Kerberos Principal": The principal of your user, you can obtain it via ssh in a nifi node using your CDP User/password and perform a kinit/klist. More information in https://github.com/asdaraujo/cdp-examples#using-kerberos-authentication;
  • "Username": Your workload username;
  • "Password": Your workload password;
  • "SSL Context Service": Select "Default NiFi SSL Context Service" in the drop down menu;
  • Apply the changes and this will close the configuration;

3.2.3. The last processor to configure is the "PutHive3Streaming" processor, double-click in this processor and configure:

  • "Hive Metastore URI": Change the value to the DNS of your Data Lake Master Node DNS/port, ex: thrift://master-node:9083,  this can be located in CDP UI;
  • "Hive Configuration Resources": Check if the paths are valid since it can change, for this, you can ssh in a NiFi node and check the configuration;
  • "Database Name": streaming (or the name of the database that you've chosen to create);
  • "Table": sensors (or the name of the table that you've defined);
  • "Kerberos Principal": Your workload username;
  • "Kerberos Password": Your workload password;
  • Apply the changes and this will close the configuration;

In the end, your flow may look like this:

carrossoni_12-1633191232703.png

Figure 5: Kafka to Hive Flow

Leave the group again and start both Processor groups; you can right-click in each one and click on the Start button.

carrossoni_13-1633191232671.png

Figure 6: Flow in Action!

4. Query the Streaming Data in Cloudera Data Warehouse

Now we can simply see the streaming data directly in the Unified Analytics Virtual Warehouse and/or connect Cloudera Data Visualization or a dashboard via JDBC/ODBC to visualize the data:

carrossoni_14-1633191232716.png

Figure 7: Query the streaming sensor data in Cloudera Data Warehouse

 

And we can monitor in real-time that the data is increasing:

carrossoni_15-1633191232661.png

Figure 8: First Count

carrossoni_16-1633191232701.png

Figure 9: Second Count

Lastly, we can connect Cloudera Data Visualization directly in the table that is being ingested and see how can we quickly drive value on this data:

carrossoni_17-1633191232718.png

Summary 

In this blog post, we've seen how to achieve/create:

  • A flow to create random sensor data, send the message to a topic, consume this topic, and stream to a table;
  • Query this data using Cloudera Data Warehouse;

More details on each concept that we've seen on this post can be found in:

Streaming: Stream data into HIVE like a Boss using NiFi HiveStreaming - Olympics 1896-2008

Compaction: Data compaction

241 Views