Created on 11-02-2018 03:55 AM - edited 09-16-2022 01:44 AM
This tutorial will focus on was of ingesting raw streaming data into druid and exposing the data in hive for high performance analytics.
For Reference, this tutorial was built with CentOS 7, HDP 3.0.1 w/ LLAP enabled and Kafka installed and running
This tutorial will be divided into 6 sections:
With Kafka Ingestion Service, Druid can guarantee that all records received will the processed and stored. That’s in contrast to the older push method of Tranquility. If an event was received after the window period with tranquility, the record was dropped. This meant that a secondary batch process needed to be run to true up the data. With Kafka Ingestion Service, if an event is received late, a segment will be created for it regardless of when it was received. This is hugely powerful as it removes the need to run a secondary batch process.
The Kafka Ingestion Service is still in Technical Preview as of today, so It doesn’t come bundled as part of the platform. As such, some minor configuration changes will need to be applied.
In Ambari, navigate to the druid service and click on the configs tab. Now, using the filter, search for “druid.extensions.loadList”.
For this parameter, enter “druid-kafka-indexing-service” to the list. This parameter essentially tells Druid to load these extensions on startup on the cluster.
Click “Save” and restart any necessary services.
Next, create a Kafka topic for the tutorial. On one of the available Kafka brokers, navigate to the directory containing the client utilities. Use the utility kafka-topics.sh to create a topic on your Kafka cluster.
The data generator can be downloaded from the following link. The data generator is already configured to generate the data set for the tutorial. Only update the topic name and add the kafka broker address. Once that is done, run the python script to begin generating events.
Data Generator
If everything is configured properly, the kafka-console-producer should output events written to the topic.
Since the middle managers are conducting the actual indexing of incoming Kafka events, a configuration file needs to be delivered to the overlord node. The overlord node will take the configuration file and create tasks that will be executed on the middle manager nodes.
To submit the Kafka ingestion spec, the json configuration file must be submitted to the overlord via rest api. If successful, a json response with the ID of the KIS application.
Once the ingestion spec is submitted, navigate to the druid overlord node. A new record is now displayed under Running Supervisors with the name of the data source that was defined in the ingestion spec. There is also a record under Running Tasks.
The coordinator console however tells a different story. There isn’t an entry for the data source that’s seen in the overlord. (sales_data_30min is missing)
This is because the coordinator console only shows data managed by historical nodes. Since the kafka ingestion service was just stared, and the taskDuration is set for 30min, there won’t be an entry in here for at least that amount of time. The taskDuration controls how long a task spends reading a kafka topic before publishing the indexed data to the historical node. The data segments are available, they just aren’t managed by the historical nodes yet. They are under the domain of the middle managers until that taskDuration expires. The data however, is still accessible even though it isn’t on historical nodes. In fact, the data segments can be seen by logging into the middle managers themselves. Navigate to /apps/druid/tasks.
This will show the current task that is running. Going into that directory navigate to work/persist and the index file can be viewed. Whenever a new query is submitted, instead of going to the historical nodes, the query is routed to the middle managers and this directory to get the latest streamed data.
After 30 minutes have elapsed, log back into the coordinator console to view the newly added segments on the historical nodes.
Now that that is being streamed into Kafka and that data is being collected and indexed in Druid, the data can now be accessed in Hive. Log onto hive and create a table for the newly created druid data source.
CREATE EXTERNAL TABLE sales_data
STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
TBLPROPERTIES ("druid.datasource" = "sales_data_30min");
Next run a series of quick count(*) on the table. Watch how with each query, the result set increases! All the new data streaming into druid is immediately accessible! Behind the scenes the query is being directed to two locations for the count, the historical nodes and the middle managers. The historical nodes hold the committed index segments while the historical nodes hold the most recent data within the given taskDuration .
Even simple aggregations are applied across historical and real-time data sets. The below query came back in .014 seconds!
Like in the previous tutorial, hive has many core integrations with druid. One such integration is through external tables. Through external tables, Hive can mimic the druid supervisor spec that was created in step 5 all without having to code any complex JSON documents or interact with a REST API.
The DDL for the Kafka Ingestion Service follows the same External Table model as the standard hive on Druid table, with some small differences.
While the DDL has been executed, there is still an additional step to deploy the kafka ingestion service. The Kafka ingestion service once initially deployed isn’t running. An alter table command must be executed to “turn on” the service.
-to start the service
ALTER TABLE sales_data_kafka SET TBLPROPERTIES('druid.kafka.ingestion' = 'START');
-to stop the service
ALTER TABLE sales_data_kafka SET TBLPROPERTIES('druid.kafka.ingestion' = STOP);
Once the alter table command to start the Kafka ingestion service, navigate to the overlord console to see the newly running supervisor and the running middle manager task.
Next, query the table to see the events being added to the real time indexes.
select count(*) from sales_data_kafka;
Run some simple aggregates and watch the data increase with each query.
select sum(discount_amt),sum(qty),rep_id from sales_data_kafka group by rep_id;
One limitation of Druid is that it doesn’t do joins. There is some very limited support for runtime lookups but joining multiple data sources together is not possible today. This is why the integration with Hive is so powerful. With Hive, any dataset accessible to hive is available to be joined with data persisted in Druid. Hive’s intelligent cost based optimizer will push down as much processing down to druid to maximize performance and accelerate query processing. A small example can be viewed here. Below is a lookup table for the sales rep_id that’s found in the streaming dataset that was created in the previous step.
create table sales_reps (rep_id int, rep_name string, quota int);
insert into sales_reps (rep_id, rep_name, quota ) values (0 , "Matt Ryan", 100000);
insert into sales_reps (rep_id, rep_name, quota ) values (1 , "Matt Bryant", 150000);
insert into sales_reps (rep_id, rep_name, quota ) values (2 , "Julio Jones", 100000);
insert into sales_reps (rep_id, rep_name, quota ) values (3 , "Calvin Ridley", 120000);
insert into sales_reps (rep_id, rep_name, quota ) values (4 , "Mohammed Sanu", 200000);
insert into sales_reps (rep_id, rep_name, quota ) values (5 , "Deion Jones", 100000);
insert into sales_reps (rep_id, rep_name, quota ) values (6 , "Tevin Coleman", 300000);
insert into sales_reps (rep_id, rep_name, quota ) values (7 , "Keanu Neal", 100000);
insert into sales_reps (rep_id, rep_name, quota ) values (8 , "Alex Mac", 120000);
insert into sales_reps (rep_id, rep_name, quota ) values (9 , "Ryan Schraeder", 200000);
insert into sales_reps (rep_id, rep_name, quota ) values (10 , "Robert Alford", 100000);
insert into sales_reps (rep_id, rep_name, quota ) values (11 , "Desmond Trufant", 300000);
insert into sales_reps (rep_id, rep_name, quota ) values (12 , "Ricardo Allen", 100000);
insert into sales_reps (rep_id, rep_name, quota ) values (13 , "De'Vondre Cambell", 100000);
insert into sales_reps (rep_id, rep_name, quota ) values (14 , "Grady Jarett", 100000);
insert into sales_reps (rep_id, rep_name, quota ) values (15 , "Takkarist Mickinley", 100000);
The following query will join the druid data source with a local hive table to determine the sales reps name and their quota attainment.
select
sales.qty_sold,
sales.sale_amt,
sr.rep_name,
(sales.sale_amt/quota) as quota_attainment
from (
select sum(qty) as qty_sold,sum(trxn_amt) as sale_amt,cast(rep_id as int)
as rep_id
from sales_data_kafka
group by rep_id) sales
join sales_reps sr on
sr.rep_id=sales.rep_id;
The output of the query ran in 1.4 seconds. Further runs of the same query brought the runtime below 1 second. The caching of LLAP keeps local data in hot cache which helps accelerated data acquisition.
The integration between Kafka and druid has greatly simplified real time streaming architectures. Instead of having multiple tools to build a complex real time architecture, the combination of Druid + Hive + Kafka can simplify development and streamline integration.