Community Articles

Find and share helpful community-sourced technical articles.
Announcements
Celebrating as our community reaches 100,000 members! Thank you!
Labels (1)
avatar
Cloudera Employee

Batch Loading with Druid and Hive Integration


This brief tutorial will walk through the various ways of ingesting raw data into druid and exposing the data in hive for analytics.

For Reference, this tutorial was built with CentOS 7, HDP 3.0.1 w/ LLAP enabled.

This tutorial will be divided into 6 sections:

  • Data Generation
  • Define the Ingest Script
  • Submit the Ingest Spec
  • Accessing Data in Hive
  • Double the Data Double the Fun
  • Loading Data With Hive

Why Druid?

Druid is a distributed columnar data store that is optimized for OLAP style queries. Data loaded into Druid is pre-aggregated based upon selected dimensions and metric fields. The data is further partitioned by time to enable efficient temporal access. This time based partitioning can be as long as a year and as small as a second, depending on what the business needs are.

Today, Druid is tightly integrated with Hive which helps bring high speed OLAP analytics to the data lake. While Hive enhancements like LLAP deliver great performance improvements, Hive is still a general purpose SQL engine. Druid complements Hive by providing an performance query accelerator on certain datasets that are accessed by OLAP style workloads.

Section I: Data Generation

The following instructions below will download a map reduce job for generating the tpch datasets. The execution script you execute below will also create hive external tables for each TPCH dataset.

1. Log in to the console as root.

2. Become the hive user: su - hive

3. Download the data generator bundle: wget https://github.com/cartershanklin/sandbox-datagen/archive/master.zip

4. Extract the bundle: tar -zxf datagen.tgz

5. Enter the datagen directory: cd datagen

6. Generate the data: sh datagen.sh 12

Please note that the datagen script input parameter represents the amount of data (GB) to be generated. For small test systems and VM’s, 6GB should be appropriate.The datagen script will also create the appropriate hive tables for the tutorial. Please ensure that the external table file location matches that of the generated data.

Section II: Define the Ingest Spec

To load batch data into Druid, an ingestion spec must first be created. The ingestion spec that tells druid how to properly index the data. At first glance, the ingestion spec template looks very complicated with lots parameters to configure. Breaking down the ingestion spec into individual components makes understanding the ingestion spec much clearer.

93088-ingestspec.png

Please note that this method of ingest pushes the index creation burden to the hadoop cluster. For non-hadoop druid deployments, Druid can leverage internal indexing methods (via middle managers) to index the data. For more information see the "Ingestion Methods" section in the following link: Ingestion Methods

Section III: Submit the Ingest Spec

To execute the druid index build, the ingest spec must be submitted to the indexing service running on the Druid overlord. A simple API call is all that is needed to kick off the job. Below, and example is provided. All that is needed is location of the ingest spec json file and the url of the overlord api indexer endpoint.

93089-apicall.png

To view the running job, navigate to the overlord UI to see the job running. On the column named “more” you can view the logs for the indexing process running in real time. This is also a great location for troubleshooting issues/errors that may arise when indexing data.

93090-overlord.png

The coordinator console UI will also show some details related to the indexing process. Navigate to the indexing tab and the indexing service will show that 1 out of 3 indexing slots are occupied by a running job. Shown in the red box, the same logs that were viewed in the overlord UI are available here as well.

93091-coordinatorconsole.png

When the job has finished, the overlord UI show the task as “completed".

93094-overlordcompleted.png

The coordinator console will now show the data source as fully available for access. 93092-coordinatorcomplete.pngClicking on the new index will show the visual representation of the segments loaded and their respective sizes. Selected the index from the list in the lower left will show the location of the segment in hdfs as well as the defined metrics and dimensions.

93095-coordinatorsegments.png

Section IV: Accessing Data in Hive

Druid is integrated with Hive via External Tables and has druid storage handler that tells Hive how to talk with Druid. Below is the base DDL for exposing druid data sources in hive:

CREATE EXTERNAL TABLE lineitem

STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'

TBLPROPERTIES ("druid.datasource" = "druid data source name");

The only information required for this is the name of the druid data source, that’s it. Upon creation, hive will import the attributes and metrics of the druid data source and import them in the DDL.

93096-query1.png

Next, take a look at the DDL with the a show create table command:

93097-query2.png

Notice, all the dimensions and metrics that were defined in the ingestion spec are now part of the ddl for the hive table. Finally, query the table:

select sum(l_quantity_sum),sum(l_tax_sum),l_returnflag from lineitem_druid group by l_returnflag;

93098-query3.png

When the above example was run, the physical query execution was .031 seconds! More time was spent compiling the query than the actual physical execution! Running the same query with LLAP took nearly 13 seconds.

93099-query4.png

Subsequent runs were ins the sub seconds due to caching but shows the instantaneous performance you get with druid.

Section V: Double the Data Double the Fun

Next, experiment with doubling the data to see the performance difference. If this tutorial is being run on a smaller hw configuration, it is recommended that LLAP be disabled until the druid processing is completed. This is to ensure proper resources for dataset creation and indexing and to avoid long index build times due to insufficient resources.

select sum(l_quantity_sum),sum(l_tax_sum),l_returnflag from lineitem_druid group by l_returnflag;

93100-query5.png

The physical execution only took .037 seconds! How was that speed still attainable even after we doubled the data? This all has to do with how Druid stores and serves up data.

Druid’s performance comes from its ability to store data in a highly compressed columnar format along with embedded bitmap indexes to enable fast searching across millions to billions of records. These data segments are also partitioned by time to enable efficient range based data access.

Druid physically optimizes IO by how it serves the data to end applications. Druid data segments are distributed evenly and replicated across the historical nodes on a druid cluster. The data segments are then memory mapped to local node memory for optimized IO. If your data cannot fit entirely in memory, druid will perform physical IO to retrieve the data segments from local disk. Its recommended that the underlying physical disk storage for cache be SSD or NVME storage.

Druid's performance is largely driven by how much available compute and memory are available on your historical nodes. More cores and memory to process the data segments will yield greater performance. Druid’s shared nothing architecture makes it very easy to scale out systems. If more performance is required, simply scale out the cluster (historical nodes).

Brokers and historical nodes can also employ local query caching(LRU) to cache results from queries. This can also be used to accelerate queries by providing IO elimination. These caches can be deployed on brokers and historical nodes or can be stored in an external key value store. If done locally on brokers or historical nodes, more memory will increase query cache effectiveness.

93101-arch.png

Section VI: Loading Data With Hive

Druid can also be loaded with Hive. What’s been demonstrated in the previous sections was accomplished via the manual ingest method. This method is very flexible and can be used to load druid indexes in many different deployments.

With most data already in hive in most big data deployments, it makes the most sense to leverage traditional SQL DDL/DML commands to create and load druid datasources. With this method, the metrics, filters, transformations, and dimension can all be created using SQL.

Below is a CREATE TABLE AS command that creates the data source in druid, loads it, and creates the external table definition in hive. The SQL approach grossly simplifies the process to create druid data sources, without having to configure any complex JSON docs.

LLAP MUST BE ENABLED FOR DRUID INTEGRATION!

93104-ctas.png

Output from executing the create table statement. There will be a noticeable pause after the Tez job has completed so don’t be alarmed if there is a long pause after the tez processing has completed. The larger your data, the longer the pause.

93105-query6.pngNow run some SQL against the table:

select sum(l_quantity),sum(l_tax),l_returnflag from lineitem_druid_internal group by l_returnflag;

93103-query7.png

For loading incremental data, a simple insert into command is all that’s required. This makes it incredibly easy to incrementally load druid and way easier than customizing ingestion specs.

INSERT INTO lineitem_druid_internal

SELECT

cast(L_SHIPDATE as timestamp) as `__time`,

cast(L_ORDERKEY as string),

L_QUANTITY,

L_EXTENDEDPRICE,

L_DISCOUNT,

L_TAX,

L_RETURNFLAG,

L_LINESTATUS,

L_SHIPMODE, L_COMMITDATE,

L_RECEIPTDATE,

L_EXTENDEDPRICE * (1 - L_DISCOUNT) as SUM_DISC_PRICE,

L_EXTENDEDPRICE * (1 - L_DISCOUNT) * (1 + L_TAX) as SUM_CHARGE

FROM lineitem

WHERE L_SHIPDATE = '1993-05-01';

Lastly, join the druid lineitem table with the orders table stored in Hive. It takes a little bit longer since we're federating the query across multiple systems but it shows the flexibility of Hive and the intelligence of the CBO.<i>

SELECT

SUM(SUM_CHARGE) AS CHARGE,

O_CLERK

FROM lineitem_druid_internal

JOIN orders ON

cast(orders.O_ORDERKEY as string)=lineitem_druid_internal.L_ORDERKEY

WHERE `__TIME` = '1993-09-01'

GROUP BY O_CLERK

ORDER BY CHARGE DESC<br> LIMIT 10;

93106-query8.png

Conclusion

The recent advancements in HDP 2.6.x and 3.x have really simplified the process to create druid data sources and load them on an incremental basis. While defining ingestions specs is simple enough, hive through the use of SQL dramatically lowers the barrier of entry for users just getting started Druid.

Now, users can take their most demanding OLAP workloads in hive and easily transpose them on a platform designed for high performance analytics.


ctas.pngoverlordcompleted.png
5,741 Views
Comments
avatar
New Contributor

great article @jtaras, Unfortunately it seems that there is a broken link to https://github.com/hortonworks/data-tutorials/raw/master/tutorials/hdp/interactive-sql-on-hadoop-wit... Would you mind to upload the file again and edit the link. Thanks!

avatar
Cloudera Employee

I've updated the link.

Version history
Last update:
‎09-16-2022 01:44 AM
Updated by:
Contributors