Member since
07-30-2019
8
Posts
15
Kudos Received
0
Solutions
09-10-2019
09:00 AM
When developing NiFi flows with python, it's sometimes a headache to build customized configurations for your processors. Every processor is different and has many different parameters to customize depending on your flows needs and requirements. To accelerate development, I use the development tools in my chrome browser to help accelerate development by extracting the config files from the rest calls the NiFi application makes. Open your browser and point it to your NiFi instances. Once in the UI right click on the gray bar and select "inspect". This will open up a development view of the UI. Next, click on the network tab to view and debug network activity. In this view, all network related traffic will be displayed here. Notice you can see all the rest calls that the NiFi is making behind the scene. This can be quite helpful when customizing processors. When you drag a processor on the canvas and parameterize it, a rest call is made that creates the processor. Along with that processor creation rest call is a configuration payload that we can capture and use with our NiFi python development. Example: Create a simple generate flow file processor. When you drag it into the canvas you'll see an entry be posted to the network. It will be the rest call made to create the processor. If you scroll down to the request payload, you'll get a view of the configuration that was submitted with the processor. Next, update the processor by changing the flow file custom text. You'll immediately see an new event posted on the network. This event will give you the configuration for the processor: Here is the JSON representation: Using this tool you can easily extract the configuration properties for any processor and use them when designing programatic flows using python.
... View more
Labels:
09-10-2019
08:06 AM
1 Kudo
Building Basic Flows with Nipyapi If building flows in NiFi wasn't easy enough, developers can use the Nipyapi library to build flows with python. With this model, one can programmatically build flows without having access to the canvas. This comes in handy when deploying highly configurable and repeatable flows. In this simple example below, we'll walk through using the python libraries to build a very simple flow whereby we use the GenerateFlowFile processor to write data to the local file system. Very contrived but one can get the picture of the flexibility afforded by the nipyapi libraries. For more information on the python libraries discussed here, please go to: https://nipyapi.readthedocs.io/en/latest/index.html First things first, you'll need to import the required library and classes for the example. from nipyapi import canvas,config from random import randrange Next set the configuration details of your NiFi environment. Here, I'm setting the api connection details for both NiFi and NiFi Registry config.nifi_config.host = 'http://localhost:8080/nifi-api' config.registry_config.host = 'http://localhost:18080/nifi-registry-api' With NiFi the canvas is where you visually place the various processors you use in your flows. By using python, you can't visually place anything. Therefore we must give the coordinates of where we want the processors to be placed. Here, I'm using a simple random function to derive a coordinate point between 0 and 4000. This will randomly place the processors on the canvas. location_x = randrange(0,4000) location_y = randrange(0,4000) location = (location_x, location_y) NiFi has a ton of configuration properties for each individual processor. Some are required while others are optional. In order to tackle all of the required properties for this example, we must pass in a configuration that tells NiFi how to set up the processor. See the Configuration for more details. See my article on using the chrome browser to help generate these processor configuration files. Click here for information on how to do this. ##send in configuration details for customizing the processors processor_GenFlowFile_config = {"properties":{"generate-ff-custom-text":"hello world"},"schedulingPeriod":"10 sec","schedulingStrategy":"TIMER_DRIVEN"} processor_PutFile_config = {"properties":{"Directory":"${dir}","Conflict Resolution Strategy":"replace","Maximum File Count":100},"autoTerminatedRelationships":["failure","success"]} In order to place a new processor group on the canvas, we need to find the root procesor group which happens to be the canvas you see when you log into NiFi. This is quite simple: ##get the root processgroup ID root_id = canvas.get_root_pg_id() Next, we'll need to pull all the metadata about this processor group in order to create process groups on the canvas. ##get the root process group root_process_group = canvas.get_process_group(root_id, 'id') With all this information, we can now create a new processor group that our test flow will live in. ##create the processor group new_processor_group = canvas.create_process_group(root_process_group, 'test_process_group', location, 'this is a test') Next, we'll need to get the processors we'd like to implement. This code, searches the list of processor types and retrieves the processor we selected. The search criteria is the java processor class name. ##get the processors processor_GenFlowFile = canvas.get_processor_type('org.apache.nifi.processors.standard.GenerateFlowFile', identifier_type='name') processor_PutFile = canvas.get_processor_type('org.apache.nifi.processors.standard.PutFile', identifier_type='name') In this step, we are placing the processors in our newly created processor group. In this code snippet, for each processor, i'm passing in the the processor group object, the processor object, the location I want the processor to live as well as the configuration files for each. ##put processors on canvas in specificed process group GenFlowFile = canvas.create_processor(new_processor_group, processor_GenFlowFile,(randrange(0,4000),randrange(0,4000)),name=None,config=processor_GenFlowFile_config) PutFile = canvas.create_processor(new_processor_group, processor_PutFile,(randrange(0,4000),randrange(0,4000)),name=None,config=processor_PutFile_config) Once the processor have been created, linkages between the processor can be established. Here, the processors that were created in the previous step are used to establish the relationship between them. ##canvas create connection between processors canvas.create_connection(GenFlowFile, PutFile ,relationships=None, name="linkage") The second to last step involves setting any variables for the processor group. This method will allow you to customize any configuration files you used in the prior steps. If you look at the step where the configuration files were defined, you'll see 2 keys where the values are NiFi variables, ${file_count} and ${dir}. All that needs to be done is set the variables in the registry to parameterize your flow. ##get variable registry var_reg = canvas.get_variable_registry(new_processor_group, True) ##set new variables from incoming file canvas.update_variable_registry(new_processor_group,([('dir', '/tmp/test')])) Finally, start the processor flow. ##start process group canvas.schedule_process_group(new_processor_group.id, True) To see the newly running flow, go to the canvas. The processor group should be running. Mine has a error and thats due to the file limits I built into my configuration files (no more than 100 files in a directory) Going into the processor group you should see the two processors we defined: Given the random nature of putting the processors on the canvas, they may be really spread out so you may need to adjust them on the canvas to see them clearly. There you have it! A simple flow built by the nipyapi! I hope this demonstrates the powerful capabilities of the nipyapi and how it can be used to create customizable flows...it certainly beats having to write REST calls!
... View more
Labels:
08-19-2019
02:02 PM
2 Kudos
HDFS Snapshots In Part 1 we looked at the basics of HDFS snapshots. In next section we'll look at what happens with managing multiple snapshots in a given directory and what to look out for... primarily phantom data that exists when you delete a dataset that has a snapshot or multiple snapshots linked to it. Working With Multiple Snapshots First lets take a new snapshot on the directory we were previously working on: hdfs dfs -createSnapshot /tmp/snapshot_dir/dir1 20190819v1 Next lets add another file in the same base directory that we have file1.txt in: hdfs dfs -put file2.txt /tmp/snapshot_dir/dir1 Let's pretend that 8 hours later, you take another snapshot of the same directory. Here we named the snapshot v2: hdfs dfs -createSnapshot /tmp/snapshot_dir/dir1 20190819v2 Let's pause and take a look at the size of the directory: hdfs dfs -du -h /tmp/snapshot_dir Now lets delete file1.txt: hdfs dfs -rm /tmp/snapshot_dir/dir1/file1.txt Take a look at the directory size now: hdfs dfs -du -h /tmp/snapshot_dir Only one file remains in the directory (file2.txt) but we still see the physically file represented in the second number...those phantom files! Let's go a little bit further and load another file and take another snapshot. Let's pretend it's 8 hours later and we name the snapshot v3: hdfs dfs -put file3.txt /tmp/snapshot_dir/dir1 Next take a look at the directory sizes for reference: hdfs dfs -du -h /tmp/snapshot_dir
hdfs dfs -du -h /tmp/snapshot_dir/dir1 Note how the the replication number for both images above still show phantom data being present. The two files written have a combined size (with rep) of 111 bytes. The other 36 are from the phantom data that exists from the first snapshot still being live despite the data being deleted. This data will always remain UNTIL the last snapshot that references that file is deleted. Lets next take a look at the snapshots that we just recently created (v2 and v3): hdfs dfs -ls /tmp/snapshot_dir/dir1/.snapshot
hdfs dfs -ls /tmp/snapshot_dir/dir1/.snapshot/20190819v2
hdfs dfs -ls /tmp/snapshot_dir/dir1/.snapshot/20190819v3
hdfs dfs -ls /tmp/snapshot_dir/dir1/.snapshot/20190819v1 Notice how the v2 snapshot has a reference to file1.txt. Even if we delete the v1 snapshot, the phantom data will still remain. To test this, lets delete the v1 snapshot: hdfs dfs -deleteSnapshot /tmp/snapshot_dir/dir1 20190819v1 As expected, deleting the first snapshot didn't help delete the phantom data. Now, let's delete the v2 snapshot and see what happens: hdfs dfs -deleteSnapshot /tmp/snapshot_dir/dir1 20190819v2 As you can see, the phantom data is gone. The snapshot that was holding reference to it was deleted and with it went the data. Conclusion HDFS Snapshots can be a very powerful tool but one must exercise caution when using them. Protection from data deletion may be great but it comes at a cost. If implementing hdfs snapshotting you must create management framework for keeping track of snapshots to ensure proper HDFS space utilization.
... View more
Labels:
08-19-2019
01:04 PM
1 Kudo
HDFS Snapshots HDFS Snapshots are a great way to backup important data on HDFS. It's extremely easy to implement and it helps safeguard your data from instances where a user or admin accidentally deletes data. In the article below, we'll walkthrough some simple examples of using snapshots and some of the gotchas to look out for when implementing them. Part 1: Understanding Snapshots First lets create some files and directories for testing: echo "Hello World" > file1.txt
echo "How are you" > file2.txt
echo "hdfs snapshots are great" > file3.txt
hdfs dfs -mkdir /tmp/snapshot_dir
hdfs dfs -mkdir /tmp/snapshot_dir/dir1 Next lets put file1.txt in the directory: hdfs dfs -put file1.txt /tmp/snapshot_dir/dir1 Creating a snapshot is a really simple process. You will need a user with superuser permissions to create a snapshottable directory. With this process, we enable the directory to have snapshots but we're not explicitly creating snapshots with this action. hdfs dfsadmin -allowSnapshot /tmp/snapshot_dir/dir1 Next, let's take a look at the size of the directory the files we loaded: hdfs dfs -du -h /tmp/snapshot_dir The output contains 2 numbers, the first is the size of the file and the other is the size of the file + replication. We have 3x replication by default with HDFS and that second number is a multiple of 3. Next, let's create the snapshot. To do this we'll need to identify the directory we want to snapshot as well as a name for a snapshot. I'd recommend a date format like example below to easily keep track of when the snapshot was taken. Note that the directory that the snapshot is taken in will take a snapshot of all the files directories under that directory. Also you can't created nested snapshots so be judicious in your selection. hdfs dfs -createSnapshot /tmp/snapshot_dir/dir1 20190819v1 Once that is completed, let's check the size of the directory again. As you can see, the directory size didn't increase. Thats because the snapshots are point in time logical backups of your data. A pointer is created in the namenode that links the snapshots to the files on disk. If a deletion happens, the name node drops it's is logical reference to that data but the data physically remains. The snapshot acts as a secondary reference that can be used to recover the files logically and restore the namename node reference. To test this let's delete a file: hdfs dfs -rm /tmp/snapshot_dir/dir1/file1.txt First make sure the file is removed: hdfs dfs -ls /tmp/snapshot_dir/dir1/ Now check the directory size: hdfs dfs -du -h /tmp/snapshot_dir Notice that while the file size is 0 because it doesn't logically exist, the second number (replication size) is still populated. Thats because while the file does't logically exist, it is still physically present. This is important to remember because if you delete files with snapshots they aren't physically deleted unless the snapshot holding them is deleted. This can result in lots of "phantom" data in the system taking up valuable HDFS real state. Now let's restore a file. In this process, you navigate to the hidden snapshot folder that holds the individual snapshots. This will be located where you took the first snapshot: hdfs dfs -ls /tmp/snapshot_dir/dir1/.snapshot In here you should see the snapshot we took previously. Now, we can copy the file from the content of the snapshot and restore the file. Not the flags for -ptopax restores the file with the same timestamp, ownership, permissions, ACL's and XAttrs as the original. hdfs dfs -cp -ptopax /tmp/snapshot_dir/dir1/.snapshot/20190819v1/file1.txt /tmp/snapshot_dir/dir1 Now that the data has been copied, lets take a look at the directory sizes: hdfs dfs -du -h /tmp/snapshot_dir
hdfs dfs -ls /tmp/snapshot_dir/dir1/ The file sizes look good but the system sizes are off the charts. We expect 36 bytes but it comes in at 72. Thats because while we copied the file from the snapshot, the original snapshot copy of the file still remains on disk...so now we have 2 copies of the same file, one referenced directly by the name node and the other by the snapshot. To remove this phantom data, we must delete the snapshot: hdfs dfs -deleteSnapshot /tmp/snapshot_dir/dir1 20190819v1 With the snapshot deleted take a look at the size of the directory: hdfs dfs -du -h /tmp/snapshot_dir Phantom Data No More! Everything is back in order and has the expected number of bytes. Look for Part II as we examine what to look out for when managing multiple snapshots.
... View more
Labels:
12-13-2018
07:39 PM
2 Kudos
This article will demonstrate how to install anaconda on an
HDP 3.0.1 instance and the configuration to enable Zeppelin to utilize the
anaconda python libraries to use with apache spark. Pre-requisites: bzip2 library needs to be
installed prior to installing anaconda Step 1. Download and Install Anaconda https://www.anaconda.com/download/#linux From the following link, download the anaconda installer. It
will be a large bash script around 600MB in size. Please land this file in an appropriate
directory like /tmp. Once downloaded, provide the installer with executable permissions. One completed, the installer can be executed. Please execute
with a user with sudo privileges. Follow
the prompts and accept the license agreements. As a best practice, please
install anaconda in a separate directory than the default. /opt is the default
for my installations. At the very end of the installation it will prompt the user
to initialize the install in the installing users basrc file. If you have a mulit-node environment, please re-run this process
on the remaining nodes. Step 2. Configure Zeppelin Log into Zeppelin and open the page for interpreters. Scroll
down to the spark interpreter and look for the zeppelin.pyspark.python. This
parameter points to the python binary that is used when executing python
applications. This is defaulted to the python binary bundled with the OS. This
will need to be updated to the anaconda python binary. Next, navigate to the location you installed anaconda. In this example the location is /opt/anaconda2/bin. Add the full path to the python binary in the
/opt/anaconda2/bin directory. Make the changes and save the settings for the interpreter.
Finally, restart the interpreter and you should be good to go! If you run into any issues, make sure that zeppelin has the
correct file permissions to run the new python libraries.
... View more
Labels:
11-02-2018
03:55 AM
6 Kudos
Druid Kafka Integration Service + Hive
This tutorial will focus on was of ingesting raw streaming data into druid and exposing the data in hive for high performance analytics.
For Reference, this tutorial was built with CentOS 7, HDP 3.0.1 w/ LLAP enabled and Kafka installed and running
This tutorial will be divided into 6 sections:
Ambari Configuration Kafka Setup Setup Data Stream Generating and Running the Supervisor Spec Hive Integration Create Streaming Druid Data Sources through Hive Kafka Integration Service
With Kafka Ingestion Service, Druid can guarantee that all records received will the processed and stored. That’s in contrast to the older push method of Tranquility. If an event was received after the window period with tranquility, the record was dropped. This meant that a secondary batch process needed to be run to true up the data. With Kafka Ingestion Service, if an event is received late, a segment will be created for it regardless of when it was received. This is hugely powerful as it removes the need to run a secondary batch process. Section I: Ambari Configuration
The Kafka Ingestion Service is still in Technical Preview as of today, so It doesn’t come bundled as part of the platform. As such, some minor configuration changes will need to be applied.
In Ambari, navigate to the druid service and click on the configs tab. Now, using the filter, search for “druid.extensions.loadList”.
For this parameter, enter “druid-kafka-indexing-service” to the list. This parameter essentially tells Druid to load these extensions on startup on the cluster.
Click “Save” and restart any necessary services. Section II: Setup Kafka
Next, create a Kafka topic for the tutorial. On one of the available Kafka brokers, navigate to the directory containing the client utilities. Use the utility kafka-topics.sh to create a topic on your Kafka cluster.
Section III: Setup Data Stream
The data generator can be downloaded from the following link. The data generator is already configured to generate the data set for the tutorial. Only update the topic name and add the kafka broker address. Once that is done, run the python script to begin generating events.
Data Generator
If everything is configured properly, the kafka-console-producer should output events written to the topic.
Section IV: Generating and Running the Supervisor Spec
Since the middle managers are conducting the actual indexing of incoming Kafka events, a configuration file needs to be delivered to the overlord node. The overlord node will take the configuration file and create tasks that will be executed on the middle manager nodes.
To submit the Kafka ingestion spec, the json configuration file must be submitted to the overlord via rest api. If successful, a json response with the ID of the KIS application.
Once the ingestion spec is submitted, navigate to the druid overlord node. A new record is now displayed under Running Supervisors with the name of the data source that was defined in the ingestion spec. There is also a record under Running Tasks.
The coordinator console however tells a different story. There isn’t an entry for the data source that’s seen in the overlord. (sales_data_30min is missing)
This is because the coordinator console only shows data managed by historical nodes. Since the kafka ingestion service was just stared, and the taskDuration is set for 30min, there won’t be an entry in here for at least that amount of time. The taskDuration controls how long a task spends reading a kafka topic before publishing the indexed data to the historical node. The data segments are available, they just aren’t managed by the historical nodes yet. They are under the domain of the middle managers until that taskDuration expires. The data however, is still accessible even though it isn’t on historical nodes. In fact, the data segments can be seen by logging into the middle managers themselves. Navigate to /apps/druid/tasks.
This will show the current task that is running. Going into that directory navigate to work/persist and the index file can be viewed. Whenever a new query is submitted, instead of going to the historical nodes, the query is routed to the middle managers and this directory to get the latest streamed data.
After 30 minutes have elapsed, log back into the coordinator console to view the newly added segments on the historical nodes.
Section V: Hive Integration
Now that that is being streamed into Kafka and that data is being collected and indexed in Druid, the data can now be accessed in Hive. Log onto hive and create a table for the newly created druid data source.
CREATE EXTERNAL TABLE sales_data
STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
TBLPROPERTIES ("druid.datasource" = "sales_data_30min");
Next run a series of quick count(*) on the table. Watch how with each query, the result set increases! All the new data streaming into druid is immediately accessible! Behind the scenes the query is being directed to two locations for the count, the historical nodes and the middle managers. The historical nodes hold the committed index segments while the historical nodes hold the most recent data within the given taskDuration .
Even simple aggregations are applied across historical and real-time data sets. The below query came back in .014 seconds!
Section VI: Create Streaming Druid Data Sources through Hive
Like in the previous tutorial, hive has many core integrations with druid. One such integration is through external tables. Through external tables, Hive can mimic the druid supervisor spec that was created in step 5 all without having to code any complex JSON documents or interact with a REST API.
The DDL for the Kafka Ingestion Service follows the same External Table model as the standard hive on Druid table, with some small differences.
While the DDL has been executed, there is still an additional step to deploy the kafka ingestion service. The Kafka ingestion service once initially deployed isn’t running. An alter table command must be executed to “turn on” the service.
-to start the service
ALTER TABLE sales_data_kafka SET TBLPROPERTIES('druid.kafka.ingestion' = 'START');
-to stop the service
ALTER TABLE sales_data_kafka SET TBLPROPERTIES('druid.kafka.ingestion' = STOP);
Once the alter table command to start the Kafka ingestion service, navigate to the overlord console to see the newly running supervisor and the running middle manager task.
Next, query the table to see the events being added to the real time indexes.
select count(*) from sales_data_kafka;
Run some simple aggregates and watch the data increase with each query.
select sum(discount_amt),sum(qty),rep_id from sales_data_kafka group by rep_id;
One limitation of Druid is that it doesn’t do joins. There is some very limited support for runtime lookups but joining multiple data sources together is not possible today. This is why the integration with Hive is so powerful. With Hive, any dataset accessible to hive is available to be joined with data persisted in Druid. Hive’s intelligent cost based optimizer will push down as much processing down to druid to maximize performance and accelerate query processing. A small example can be viewed here. Below is a lookup table for the sales rep_id that’s found in the streaming dataset that was created in the previous step.
create table sales_reps (rep_id int, rep_name string, quota int);
insert into sales_reps (rep_id, rep_name, quota ) values (0 , "Matt Ryan", 100000);
insert into sales_reps (rep_id, rep_name, quota ) values (1 , "Matt Bryant", 150000);
insert into sales_reps (rep_id, rep_name, quota ) values (2 , "Julio Jones", 100000);
insert into sales_reps (rep_id, rep_name, quota ) values (3 , "Calvin Ridley", 120000);
insert into sales_reps (rep_id, rep_name, quota ) values (4 , "Mohammed Sanu", 200000);
insert into sales_reps (rep_id, rep_name, quota ) values (5 , "Deion Jones", 100000);
insert into sales_reps (rep_id, rep_name, quota ) values (6 , "Tevin Coleman", 300000);
insert into sales_reps (rep_id, rep_name, quota ) values (7 , "Keanu Neal", 100000);
insert into sales_reps (rep_id, rep_name, quota ) values (8 , "Alex Mac", 120000);
insert into sales_reps (rep_id, rep_name, quota ) values (9 , "Ryan Schraeder", 200000);
insert into sales_reps (rep_id, rep_name, quota ) values (10 , "Robert Alford", 100000);
insert into sales_reps (rep_id, rep_name, quota ) values (11 , "Desmond Trufant", 300000);
insert into sales_reps (rep_id, rep_name, quota ) values (12 , "Ricardo Allen", 100000);
insert into sales_reps (rep_id, rep_name, quota ) values (13 , "De'Vondre Cambell", 100000);
insert into sales_reps (rep_id, rep_name, quota ) values (14 , "Grady Jarett", 100000);
insert into sales_reps (rep_id, rep_name, quota ) values (15 , "Takkarist Mickinley", 100000);
The following query will join the druid data source with a local hive table to determine the sales reps name and their quota attainment.
select
sales.qty_sold,
sales.sale_amt,
sr.rep_name,
(sales.sale_amt/quota) as quota_attainment
from (
select sum(qty) as qty_sold,sum(trxn_amt) as sale_amt,cast(rep_id as int)
as rep_id
from sales_data_kafka
group by rep_id) sales
join sales_reps sr on
sr.rep_id=sales.rep_id;
The output of the query ran in 1.4 seconds. Further runs of the same query brought the runtime below 1 second. The caching of LLAP keeps local data in hot cache which helps accelerated data acquisition.
Conclusion
The integration between Kafka and druid has greatly simplified real time streaming architectures. Instead of having multiple tools to build a complex real time architecture, the combination of Druid + Hive + Kafka can simplify development and streamline integration.
... View more
Labels:
11-02-2018
03:54 AM
5 Kudos
Batch Loading with Druid and Hive Integration This brief tutorial will walk through the various ways of ingesting raw data into druid and exposing the data in hive for analytics. For Reference, this tutorial was built with CentOS 7, HDP 3.0.1 w/ LLAP enabled. This tutorial will be divided into 6 sections: Data Generation Define the Ingest Script Submit the Ingest Spec Accessing Data in Hive Double the Data Double the Fun Loading Data With Hive Why Druid? Druid is a distributed columnar data store that is optimized for OLAP style queries. Data loaded into Druid is pre-aggregated based upon selected dimensions and metric fields. The data is further partitioned by time to enable efficient temporal access. This time based partitioning can be as long as a year and as small as a second, depending on what the business needs are. Today, Druid is tightly integrated with Hive which helps bring high speed OLAP analytics to the data lake. While Hive enhancements like LLAP deliver great performance improvements, Hive is still a general purpose SQL engine. Druid complements Hive by providing an performance query accelerator on certain datasets that are accessed by OLAP style workloads. Section I: Data Generation The following instructions below will download a map reduce job for generating the tpch datasets. The execution script you execute below will also create hive external tables for each TPCH dataset. 1. Log in to the console as root. 2. Become the hive user: su - hive 3. Download the data generator bundle: wget https://github.com/cartershanklin/sandbox-datagen/archive/master.zip 4. Extract the bundle: tar -zxf datagen.tgz 5. Enter the datagen directory: cd datagen 6. Generate the data: sh datagen.sh 12 Please note that the datagen script input parameter represents the amount of data (GB) to be generated. For small test systems and VM’s, 6GB should be appropriate.The datagen script will also create the appropriate hive tables for the tutorial. Please ensure that the external table file location matches that of the generated data. Section II: Define the Ingest Spec To load batch data into Druid, an ingestion spec must first be created. The ingestion spec that tells druid how to properly index the data. At first glance, the ingestion spec template looks very complicated with lots parameters to configure. Breaking down the ingestion spec into individual components makes understanding the ingestion spec much clearer. Please note that this method of ingest pushes the index creation burden to the hadoop cluster. For non-hadoop druid deployments, Druid can leverage internal indexing methods (via middle managers) to index the data. For more information see the "Ingestion Methods" section in the following link: Ingestion Methods Section III: Submit the Ingest Spec To execute the druid index build, the ingest spec must be submitted to the indexing service running on the Druid overlord. A simple API call is all that is needed to kick off the job. Below, and example is provided. All that is needed is location of the ingest spec json file and the url of the overlord api indexer endpoint. To view the running job, navigate to the overlord UI to see the job running. On the column named “more” you can view the logs for the indexing process running in real time. This is also a great location for troubleshooting issues/errors that may arise when indexing data. The coordinator console UI will also show some details related to the indexing process. Navigate to the indexing tab and the indexing service will show that 1 out of 3 indexing slots are occupied by a running job. Shown in the red box, the same logs that were viewed in the overlord UI are available here as well. When the job has finished, the overlord UI show the task as “completed". The coordinator console will now show the data source as fully available for access. Clicking on the new index will show the visual representation of the segments loaded and their respective sizes. Selected the index from the list in the lower left will show the location of the segment in hdfs as well as the defined metrics and dimensions. Section IV: Accessing Data in Hive Druid is integrated with Hive via External Tables and has druid storage handler that tells Hive how to talk with Druid. Below is the base DDL for exposing druid data sources in hive: CREATE EXTERNAL TABLE lineitem STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler' TBLPROPERTIES ("druid.datasource" = "druid data source name"); The only information required for this is the name of the druid data source, that’s it. Upon creation, hive will import the attributes and metrics of the druid data source and import them in the DDL. Next, take a look at the DDL with the a show create table command: Notice, all the dimensions and metrics that were defined in the ingestion spec are now part of the ddl for the hive table. Finally, query the table: select sum(l_quantity_sum),sum(l_tax_sum),l_returnflag from lineitem_druid group by l_returnflag; When the above example was run, the physical query execution was .031 seconds! More time was spent compiling the query than the actual physical execution! Running the same query with LLAP took nearly 13 seconds. Subsequent runs were ins the sub seconds due to caching but shows the instantaneous performance you get with druid. Section V: Double the Data Double the Fun Next, experiment with doubling the data to see the performance difference. If this tutorial is being run on a smaller hw configuration, it is recommended that LLAP be disabled until the druid processing is completed. This is to ensure proper resources for dataset creation and indexing and to avoid long index build times due to insufficient resources. select sum(l_quantity_sum),sum(l_tax_sum),l_returnflag from lineitem_druid group by l_returnflag; The physical execution only took .037 seconds! How was that speed still attainable even after we doubled the data? This all has to do with how Druid stores and serves up data. Druid’s performance comes from its ability to store data in a highly compressed columnar format along with embedded bitmap indexes to enable fast searching across millions to billions of records. These data segments are also partitioned by time to enable efficient range based data access. Druid physically optimizes IO by how it serves the data to end applications. Druid data segments are distributed evenly and replicated across the historical nodes on a druid cluster. The data segments are then memory mapped to local node memory for optimized IO. If your data cannot fit entirely in memory, druid will perform physical IO to retrieve the data segments from local disk. Its recommended that the underlying physical disk storage for cache be SSD or NVME storage. Druid's performance is largely driven by how much available compute and memory are available on your historical nodes. More cores and memory to process the data segments will yield greater performance. Druid’s shared nothing architecture makes it very easy to scale out systems. If more performance is required, simply scale out the cluster (historical nodes). Brokers and historical nodes can also employ local query caching(LRU) to cache results from queries. This can also be used to accelerate queries by providing IO elimination. These caches can be deployed on brokers and historical nodes or can be stored in an external key value store. If done locally on brokers or historical nodes, more memory will increase query cache effectiveness. Section VI: Loading Data With Hive Druid can also be loaded with Hive. What’s been demonstrated in the previous sections was accomplished via the manual ingest method. This method is very flexible and can be used to load druid indexes in many different deployments. With most data already in hive in most big data deployments, it makes the most sense to leverage traditional SQL DDL/DML commands to create and load druid datasources. With this method, the metrics, filters, transformations, and dimension can all be created using SQL. Below is a CREATE TABLE AS command that creates the data source in druid, loads it, and creates the external table definition in hive. The SQL approach grossly simplifies the process to create druid data sources, without having to configure any complex JSON docs. LLAP MUST BE ENABLED FOR DRUID INTEGRATION! Output from executing the create table statement. There will be a noticeable pause after the Tez job has completed so don’t be alarmed if there is a long pause after the tez processing has completed. The larger your data, the longer the pause. Now run some SQL against the table: select sum(l_quantity),sum(l_tax),l_returnflag from lineitem_druid_internal group by l_returnflag; For loading incremental data, a simple insert into command is all that’s required. This makes it incredibly easy to incrementally load druid and way easier than customizing ingestion specs. INSERT INTO lineitem_druid_internal SELECT cast(L_SHIPDATE as timestamp) as `__time`, cast(L_ORDERKEY as string), L_QUANTITY, L_EXTENDEDPRICE, L_DISCOUNT, L_TAX, L_RETURNFLAG, L_LINESTATUS, L_SHIPMODE, L_COMMITDATE, L_RECEIPTDATE, L_EXTENDEDPRICE * (1 - L_DISCOUNT) as SUM_DISC_PRICE, L_EXTENDEDPRICE * (1 - L_DISCOUNT) * (1 + L_TAX) as SUM_CHARGE FROM lineitem WHERE L_SHIPDATE = '1993-05-01'; Lastly, join the druid lineitem table with the orders table stored in Hive. It takes a little bit longer since we're federating the query across multiple systems but it shows the flexibility of Hive and the intelligence of the CBO.<i> SELECT SUM(SUM_CHARGE) AS CHARGE, O_CLERK FROM lineitem_druid_internal JOIN orders ON cast(orders.O_ORDERKEY as string)=lineitem_druid_internal.L_ORDERKEY WHERE `__TIME` = '1993-09-01' GROUP BY O_CLERK ORDER BY CHARGE DESC<br> LIMIT 10; Conclusion The recent advancements in HDP 2.6.x and 3.x have really simplified the process to create druid data sources and load them on an incremental basis. While defining ingestions specs is simple enough, hive through the use of SQL dramatically lowers the barrier of entry for users just getting started Druid. Now, users can take their most demanding OLAP workloads in hive and easily transpose them on a platform designed for high performance analytics.
... View more
Labels: