Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here. Want to know more about what has changed? Check out the Community News blog.
This article will demonstrate how to install anaconda on an
HDP 3.0.1 instance and the configuration to enable Zeppelin to utilize the
anaconda python libraries to use with apache spark. Pre-requisites: bzip2 library needs to be
installed prior to installing anaconda Step 1. Download and Install Anaconda https://www.anaconda.com/download/#linux From the following link, download the anaconda installer. It
will be a large bash script around 600MB in size. Please land this file in an appropriate
directory like /tmp. Once downloaded, provide the installer with executable permissions. One completed, the installer can be executed. Please execute
with a user with sudo privileges. Follow
the prompts and accept the license agreements. As a best practice, please
install anaconda in a separate directory than the default. /opt is the default
for my installations. At the very end of the installation it will prompt the user
to initialize the install in the installing users basrc file. If you have a mulit-node environment, please re-run this process
on the remaining nodes. Step 2. Configure Zeppelin Log into Zeppelin and open the page for interpreters. Scroll
down to the spark interpreter and look for the zeppelin.pyspark.python. This
parameter points to the python binary that is used when executing python
applications. This is defaulted to the python binary bundled with the OS. This
will need to be updated to the anaconda python binary. Next, navigate to the location you installed anaconda. In this example the location is /opt/anaconda2/bin. Add the full path to the python binary in the
/opt/anaconda2/bin directory. Make the changes and save the settings for the interpreter.
Finally, restart the interpreter and you should be good to go! If you run into any issues, make sure that zeppelin has the
correct file permissions to run the new python libraries.
... View more
Druid Kafka Integration Service + Hive
This tutorial will focus on was of ingesting raw streaming data into druid and exposing the data in hive for high performance analytics.
For Reference, this tutorial was built with CentOS 7, HDP 3.0.1 w/ LLAP enabled and Kafka installed and running
This tutorial will be divided into 6 sections:
Ambari Configuration Kafka Setup Setup Data Stream Generating and Running the Supervisor Spec Hive Integration Create Streaming Druid Data Sources through Hive Kafka Integration Service
With Kafka Ingestion Service, Druid can guarantee that all records received will the processed and stored. That’s in contrast to the older push method of Tranquility. If an event was received after the window period with tranquility, the record was dropped. This meant that a secondary batch process needed to be run to true up the data. With Kafka Ingestion Service, if an event is received late, a segment will be created for it regardless of when it was received. This is hugely powerful as it removes the need to run a secondary batch process. Section I: Ambari Configuration
The Kafka Ingestion Service is still in Technical Preview as of today, so It doesn’t come bundled as part of the platform. As such, some minor configuration changes will need to be applied.
In Ambari, navigate to the druid service and click on the configs tab. Now, using the filter, search for “druid.extensions.loadList”.
For this parameter, enter “druid-kafka-indexing-service” to the list. This parameter essentially tells Druid to load these extensions on startup on the cluster.
Click “Save” and restart any necessary services. Section II: Setup Kafka
Next, create a Kafka topic for the tutorial. On one of the available Kafka brokers, navigate to the directory containing the client utilities. Use the utility kafka-topics.sh to create a topic on your Kafka cluster.
Section III: Setup Data Stream
The data generator can be downloaded from the following link. The data generator is already configured to generate the data set for the tutorial. Only update the topic name and add the kafka broker address. Once that is done, run the python script to begin generating events.
If everything is configured properly, the kafka-console-producer should output events written to the topic.
Section IV: Generating and Running the Supervisor Spec
Since the middle managers are conducting the actual indexing of incoming Kafka events, a configuration file needs to be delivered to the overlord node. The overlord node will take the configuration file and create tasks that will be executed on the middle manager nodes.
To submit the Kafka ingestion spec, the json configuration file must be submitted to the overlord via rest api. If successful, a json response with the ID of the KIS application.
Once the ingestion spec is submitted, navigate to the druid overlord node. A new record is now displayed under Running Supervisors with the name of the data source that was defined in the ingestion spec. There is also a record under Running Tasks.
The coordinator console however tells a different story. There isn’t an entry for the data source that’s seen in the overlord. (sales_data_30min is missing)
This is because the coordinator console only shows data managed by historical nodes. Since the kafka ingestion service was just stared, and the taskDuration is set for 30min, there won’t be an entry in here for at least that amount of time. The taskDuration controls how long a task spends reading a kafka topic before publishing the indexed data to the historical node. The data segments are available, they just aren’t managed by the historical nodes yet. They are under the domain of the middle managers until that taskDuration expires. The data however, is still accessible even though it isn’t on historical nodes. In fact, the data segments can be seen by logging into the middle managers themselves. Navigate to /apps/druid/tasks.
This will show the current task that is running. Going into that directory navigate to work/persist and the index file can be viewed. Whenever a new query is submitted, instead of going to the historical nodes, the query is routed to the middle managers and this directory to get the latest streamed data.
After 30 minutes have elapsed, log back into the coordinator console to view the newly added segments on the historical nodes.
Section V: Hive Integration
Now that that is being streamed into Kafka and that data is being collected and indexed in Druid, the data can now be accessed in Hive. Log onto hive and create a table for the newly created druid data source.
CREATE EXTERNAL TABLE sales_data
STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
TBLPROPERTIES ("druid.datasource" = "sales_data_30min");
Next run a series of quick count(*) on the table. Watch how with each query, the result set increases! All the new data streaming into druid is immediately accessible! Behind the scenes the query is being directed to two locations for the count, the historical nodes and the middle managers. The historical nodes hold the committed index segments while the historical nodes hold the most recent data within the given taskDuration .
Even simple aggregations are applied across historical and real-time data sets. The below query came back in .014 seconds!
Section VI: Create Streaming Druid Data Sources through Hive
Like in the previous tutorial, hive has many core integrations with druid. One such integration is through external tables. Through external tables, Hive can mimic the druid supervisor spec that was created in step 5 all without having to code any complex JSON documents or interact with a REST API.
The DDL for the Kafka Ingestion Service follows the same External Table model as the standard hive on Druid table, with some small differences.
While the DDL has been executed, there is still an additional step to deploy the kafka ingestion service. The Kafka ingestion service once initially deployed isn’t running. An alter table command must be executed to “turn on” the service.
-to start the service
ALTER TABLE sales_data_kafka SET TBLPROPERTIES('druid.kafka.ingestion' = 'START');
-to stop the service
ALTER TABLE sales_data_kafka SET TBLPROPERTIES('druid.kafka.ingestion' = STOP);
Once the alter table command to start the Kafka ingestion service, navigate to the overlord console to see the newly running supervisor and the running middle manager task.
Next, query the table to see the events being added to the real time indexes.
select count(*) from sales_data_kafka;
Run some simple aggregates and watch the data increase with each query.
select sum(discount_amt),sum(qty),rep_id from sales_data_kafka group by rep_id;
One limitation of Druid is that it doesn’t do joins. There is some very limited support for runtime lookups but joining multiple data sources together is not possible today. This is why the integration with Hive is so powerful. With Hive, any dataset accessible to hive is available to be joined with data persisted in Druid. Hive’s intelligent cost based optimizer will push down as much processing down to druid to maximize performance and accelerate query processing. A small example can be viewed here. Below is a lookup table for the sales rep_id that’s found in the streaming dataset that was created in the previous step.
create table sales_reps (rep_id int, rep_name string, quota int);
insert into sales_reps (rep_id, rep_name, quota ) values (0 , "Matt Ryan", 100000);
insert into sales_reps (rep_id, rep_name, quota ) values (1 , "Matt Bryant", 150000);
insert into sales_reps (rep_id, rep_name, quota ) values (2 , "Julio Jones", 100000);
insert into sales_reps (rep_id, rep_name, quota ) values (3 , "Calvin Ridley", 120000);
insert into sales_reps (rep_id, rep_name, quota ) values (4 , "Mohammed Sanu", 200000);
insert into sales_reps (rep_id, rep_name, quota ) values (5 , "Deion Jones", 100000);
insert into sales_reps (rep_id, rep_name, quota ) values (6 , "Tevin Coleman", 300000);
insert into sales_reps (rep_id, rep_name, quota ) values (7 , "Keanu Neal", 100000);
insert into sales_reps (rep_id, rep_name, quota ) values (8 , "Alex Mac", 120000);
insert into sales_reps (rep_id, rep_name, quota ) values (9 , "Ryan Schraeder", 200000);
insert into sales_reps (rep_id, rep_name, quota ) values (10 , "Robert Alford", 100000);
insert into sales_reps (rep_id, rep_name, quota ) values (11 , "Desmond Trufant", 300000);
insert into sales_reps (rep_id, rep_name, quota ) values (12 , "Ricardo Allen", 100000);
insert into sales_reps (rep_id, rep_name, quota ) values (13 , "De'Vondre Cambell", 100000);
insert into sales_reps (rep_id, rep_name, quota ) values (14 , "Grady Jarett", 100000);
insert into sales_reps (rep_id, rep_name, quota ) values (15 , "Takkarist Mickinley", 100000);
The following query will join the druid data source with a local hive table to determine the sales reps name and their quota attainment.
(sales.sale_amt/quota) as quota_attainment
select sum(qty) as qty_sold,sum(trxn_amt) as sale_amt,cast(rep_id as int)
group by rep_id) sales
join sales_reps sr on
The output of the query ran in 1.4 seconds. Further runs of the same query brought the runtime below 1 second. The caching of LLAP keeps local data in hot cache which helps accelerated data acquisition.
The integration between Kafka and druid has greatly simplified real time streaming architectures. Instead of having multiple tools to build a complex real time architecture, the combination of Druid + Hive + Kafka can simplify development and streamline integration.
... View more
Batch Loading with Druid and Hive Integration This brief tutorial will walk through the various ways of ingesting raw data into druid and exposing the data in hive for analytics. For Reference, this tutorial was built with CentOS 7, HDP 3.0.1 w/ LLAP enabled. This tutorial will be divided into 6 sections: Data Generation Define the Ingest Script Submit the Ingest Spec Accessing Data in Hive Double the Data Double the Fun Loading Data With Hive Why Druid? Druid is a distributed columnar data store that is optimized for OLAP style queries. Data loaded into Druid is pre-aggregated based upon selected dimensions and metric fields. The data is further partitioned by time to enable efficient temporal access. This time based partitioning can be as long as a year and as small as a second, depending on what the business needs are. Today, Druid is tightly integrated with Hive which helps bring high speed OLAP analytics to the data lake. While Hive enhancements like LLAP deliver great performance improvements, Hive is still a general purpose SQL engine. Druid complements Hive by providing an performance query accelerator on certain datasets that are accessed by OLAP style workloads. Section I: Data Generation The following instructions below will download a map reduce job for generating the tpch datasets. The execution script you execute below will also create hive external tables for each TPCH dataset. 1. Log in to the console as root. 2. Become the hive user: su - hive 3. Download the data generator bundle: wget https://github.com/cartershanklin/sandbox-datagen/archive/master.zip 4. Extract the bundle: tar -zxf datagen.tgz 5. Enter the datagen directory: cd datagen 6. Generate the data: sh datagen.sh 12 Please note that the datagen script input parameter represents the amount of data (GB) to be generated. For small test systems and VM’s, 6GB should be appropriate.The datagen script will also create the appropriate hive tables for the tutorial. Please ensure that the external table file location matches that of the generated data. Section II: Define the Ingest Spec To load batch data into Druid, an ingestion spec must first be created. The ingestion spec that tells druid how to properly index the data. At first glance, the ingestion spec template looks very complicated with lots parameters to configure. Breaking down the ingestion spec into individual components makes understanding the ingestion spec much clearer. Please note that this method of ingest pushes the index creation burden to the hadoop cluster. For non-hadoop druid deployments, Druid can leverage internal indexing methods (via middle managers) to index the data. For more information see the "Ingestion Methods" section in the following link: Ingestion Methods Section III: Submit the Ingest Spec To execute the druid index build, the ingest spec must be submitted to the indexing service running on the Druid overlord. A simple API call is all that is needed to kick off the job. Below, and example is provided. All that is needed is location of the ingest spec json file and the url of the overlord api indexer endpoint. To view the running job, navigate to the overlord UI to see the job running. On the column named “more” you can view the logs for the indexing process running in real time. This is also a great location for troubleshooting issues/errors that may arise when indexing data. The coordinator console UI will also show some details related to the indexing process. Navigate to the indexing tab and the indexing service will show that 1 out of 3 indexing slots are occupied by a running job. Shown in the red box, the same logs that were viewed in the overlord UI are available here as well. When the job has finished, the overlord UI show the task as “completed". The coordinator console will now show the data source as fully available for access. Clicking on the new index will show the visual representation of the segments loaded and their respective sizes. Selected the index from the list in the lower left will show the location of the segment in hdfs as well as the defined metrics and dimensions. Section IV: Accessing Data in Hive Druid is integrated with Hive via External Tables and has druid storage handler that tells Hive how to talk with Druid. Below is the base DDL for exposing druid data sources in hive: CREATE EXTERNAL TABLE lineitem STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler' TBLPROPERTIES ("druid.datasource" = "druid data source name"); The only information required for this is the name of the druid data source, that’s it. Upon creation, hive will import the attributes and metrics of the druid data source and import them in the DDL. Next, take a look at the DDL with the a show create table command: Notice, all the dimensions and metrics that were defined in the ingestion spec are now part of the ddl for the hive table. Finally, query the table: select sum(l_quantity_sum),sum(l_tax_sum),l_returnflag from lineitem_druid group by l_returnflag; When the above example was run, the physical query execution was .031 seconds! More time was spent compiling the query than the actual physical execution! Running the same query with LLAP took nearly 13 seconds. Subsequent runs were ins the sub seconds due to caching but shows the instantaneous performance you get with druid. Section V: Double the Data Double the Fun Next, experiment with doubling the data to see the performance difference. If this tutorial is being run on a smaller hw configuration, it is recommended that LLAP be disabled until the druid processing is completed. This is to ensure proper resources for dataset creation and indexing and to avoid long index build times due to insufficient resources. select sum(l_quantity_sum),sum(l_tax_sum),l_returnflag from lineitem_druid group by l_returnflag; The physical execution only took .037 seconds! How was that speed still attainable even after we doubled the data? This all has to do with how Druid stores and serves up data. Druid’s performance comes from its ability to store data in a highly compressed columnar format along with embedded bitmap indexes to enable fast searching across millions to billions of records. These data segments are also partitioned by time to enable efficient range based data access. Druid physically optimizes IO by how it serves the data to end applications. Druid data segments are distributed evenly and replicated across the historical nodes on a druid cluster. The data segments are then memory mapped to local node memory for optimized IO. If your data cannot fit entirely in memory, druid will perform physical IO to retrieve the data segments from local disk. Its recommended that the underlying physical disk storage for cache be SSD or NVME storage. Druid's performance is largely driven by how much available compute and memory are available on your historical nodes. More cores and memory to process the data segments will yield greater performance. Druid’s shared nothing architecture makes it very easy to scale out systems. If more performance is required, simply scale out the cluster (historical nodes). Brokers and historical nodes can also employ local query caching(LRU) to cache results from queries. This can also be used to accelerate queries by providing IO elimination. These caches can be deployed on brokers and historical nodes or can be stored in an external key value store. If done locally on brokers or historical nodes, more memory will increase query cache effectiveness. Section VI: Loading Data With Hive Druid can also be loaded with Hive. What’s been demonstrated in the previous sections was accomplished via the manual ingest method. This method is very flexible and can be used to load druid indexes in many different deployments. With most data already in hive in most big data deployments, it makes the most sense to leverage traditional SQL DDL/DML commands to create and load druid datasources. With this method, the metrics, filters, transformations, and dimension can all be created using SQL. Below is a CREATE TABLE AS command that creates the data source in druid, loads it, and creates the external table definition in hive. The SQL approach grossly simplifies the process to create druid data sources, without having to configure any complex JSON docs. LLAP MUST BE ENABLED FOR DRUID INTEGRATION! Output from executing the create table statement. There will be a noticeable pause after the Tez job has completed so don’t be alarmed if there is a long pause after the tez processing has completed. The larger your data, the longer the pause. Now run some SQL against the table: select sum(l_quantity),sum(l_tax),l_returnflag from lineitem_druid_internal group by l_returnflag; For loading incremental data, a simple insert into command is all that’s required. This makes it incredibly easy to incrementally load druid and way easier than customizing ingestion specs. INSERT INTO lineitem_druid_internal SELECT cast(L_SHIPDATE as timestamp) as `__time`, cast(L_ORDERKEY as string), L_QUANTITY, L_EXTENDEDPRICE, L_DISCOUNT, L_TAX, L_RETURNFLAG, L_LINESTATUS, L_SHIPMODE, L_COMMITDATE, L_RECEIPTDATE, L_EXTENDEDPRICE * (1 - L_DISCOUNT) as SUM_DISC_PRICE, L_EXTENDEDPRICE * (1 - L_DISCOUNT) * (1 + L_TAX) as SUM_CHARGE FROM lineitem WHERE L_SHIPDATE = '1993-05-01'; Lastly, join the druid lineitem table with the orders table stored in Hive. It takes a little bit longer since we're federating the query across multiple systems but it shows the flexibility of Hive and the intelligence of the CBO.<i> SELECT SUM(SUM_CHARGE) AS CHARGE, O_CLERK FROM lineitem_druid_internal JOIN orders ON cast(orders.O_ORDERKEY as string)=lineitem_druid_internal.L_ORDERKEY WHERE `__TIME` = '1993-09-01' GROUP BY O_CLERK ORDER BY CHARGE DESC<br> LIMIT 10; Conclusion The recent advancements in HDP 2.6.x and 3.x have really simplified the process to create druid data sources and load them on an incremental basis. While defining ingestions specs is simple enough, hive through the use of SQL dramatically lowers the barrier of entry for users just getting started Druid. Now, users can take their most demanding OLAP workloads in hive and easily transpose them on a platform designed for high performance analytics.
... View more