Created on 10-02-202109:53 AM - edited on 10-04-202111:57 PM by subratadas
In this article, I'll show how to stream data into CDP Public Cloud using Cloudera Dataflow/Streaming Datahub and query the data using Cloudera Data Warehouse.
For this exercise you'll need Cloudera Data Platform with:
Cloudera Data Warehouse;
Datahub Flow Management;
Datahub Streams Messaging;
This exercise and flow are based on the sensor data/pipeline in edge2ai-workshop, but it is in a modified version that I'll share in another repository.
1. Create the Streaming Table in Cloudera Data Warehouse
For this exercise we need two virtual warehouses:
A Hive virtual warehouse: Used only to perform the compaction process (Streaming data ingest). For this example, I've named it latam-hive;
A Unified Analytics Virtual Warehouse: Used to query data and visualization. Unified Analytics is a very exciting new feature for CDW customers at no extra fee!. You can learn more here. For this example, I've named it latam-vw;
Figure 1: Virtual Warehouses used on this exercise.
Now I can access the Hue interface in latam-hive VW to create the table used to store the sensor data that we'll be streaming:
create database streaming;
CREATE TABLE `streaming`.`sensors`(
CLUSTERED BY (sensor_ts) INTO 32 BUCKETS;
2. Getting the SDX configuration and Copy to NiFi nodes
Before configuring the flow in NiFi, we'll need to upload some configuration files in the NiFi nodes.
2.1.1. In CDP Console, go to Environment -> <YourEnvironment> and then click on "Data Lake";
2.1.2. In the data lake name in the right menu, click on "View Client Configuration URLs"
2.1.3. Download the "Hive Metastore" configuration; this will be a zip file containing the files, unzip the file;
2.1.4. Copy (via scp for example) the files core-site.xml, hdfs-site.xml, and hive-site.xml to /tmp folder of each NiFi node in your Datahub environment. Since I've only one node for this example, I will just need to do this once (ex: scp hdfs-site.xml hive-site.xml core-site.xml <cdpworkloadusername>@<publicnifinodeip>:/tmp) and make all files readable in each node (ex: chmod a=r hdfs-site.xml);
3. Configuring the NiFi Streaming Flow
Now we'll use the NiFi Streaming Flow to simulate the sensor data and send via streaming data to the Hive Metastore located in the SDX platform in CDP.
First access NiFi in the Data Flow Datahub:
Figure 2: Open NiFi in Data Hub Cluster
Now we'll upload the NiFi template located on Github. This template is based on the Edg2AI workshop, but there's a change to create the data randomly directly in the flow, not using MiNiFi. You can get the flow template here.
In the NiFi canvas on the top menu
Select "Process Group" and drag and drop to the empty canvas. A new menu will appear. Select browse
to upload the template that you've just downloaded, and then click in ADD.
Figure 3: Process Group Streaming Created
After this, you can double-click in the Streaming Process Group and see that there are more two Process Groups:
1. IoT Data Generator: Used to simulate sensor data, random errors, and put in a Kafka topic.
2. Kafka to Hive: Used to consume the Kafka topic in the first Process Group and send the data via streaming to the table that we've created.
3.1 - Configuring IoT Data Generator Group
Double click in the "IoT Data Generator" group and we'll need to update some configuration to make it work:
3.1.1. In the Operate menu inside the Process Group
click in the engine to configure the "Controller Services":
First, click on the lightning button in "JsonRecordSetWriter" and "JsonTreeReader" controllers and enable both controllers;
There'll be two controllers called "Default NiFi SSL Context Service", but one is on an "Invalid" state. Click in the right on the "Arrow" icon and then click on "Remove" button to remove this invalid service;
At the end you should have this:
3.1.2 Now, close this screen, and in the IoT Data Generator group, double-click on the "PublishKafkaRecord_2" Processor and update the following configuration in the Properties tab:
"Kafka Brokers": Change the value to the DNS of your Kafka DNS/port where the data will be sent. Example: messaging-broker:9093, if you're using a Streams Messaging Data Hub, this can be easily located in Streams Messaging Manager;
"Kerberos Principal": The principal of your user, you can obtain it via SSH in a NiFi node using your CDP User/password and perform a kinit/klist. More information is available here.
"Username": Your workload username;
"Password": Your workload password;
"SSL Context Service": Select "Default NiFi SSL Context Service" in the drop-down menu;
Apply the changes and this will close the configuration;
At the end, your flow may look like this:
Figure 4: IoT Generator flow
Now go back to the initial group "Streaming" using the bottom left menu
and now, we can configure the next Processor Group to consume the messages and send via streaming to our table.
3.2. Configuring Kafka to Hive Group
Double-click in the "Kafka to Hive" group and we'll need to update some configuration to make it work: 3.2.1. In the Operate menu inside the Process Group
click in the engine to configure the 'Controller Services':
Click in the lightning button in "JsonRecordSetWriter" and "JsonTreeReader" controllers and Enable both controllers;
In the end, you should have this:
3.2.2. Now close this screen and still in the IoT Data Generator group, double-click on the "ConsumeKafka_2_0" Processor and update the following configuration in the Properties tab:
"Kafka Brokers": Change the value to the DNS of your Kafka DNS/port where the data will be sent. Example: messaging-broker:9093, if you're using a Streams Messaging Data Hub this can be easily located in Streams Messaging Manager;