Created on 01-17-2020 07:48 PM - edited 01-21-2020 09:10 PM
In this article we’ll take an end to end look at the execution of a hive query. We will focus on the query execution in the context of both Hive on Tez, and Hive on Map Reduce.
We will assume that our data includes customer data, so we will want to apply granular access control down to the column level, to secure PCI, and PII data. This means we will be using Hive in conjunction with Apache Ranger (the standard open source Apache project for securing data in Hive).
1) Client issues the SQL Query
The query will be issued from a client such as a JDBC client, or Beeline. JDBC clients might include any of the standard BI tools such as Tableau. Beeline is the supported CLI for Hive 2 and 3, and it will also use a JDBC connection to connect to the remote Hive Server. The JDBC URL for these connections can be easily retrieved in CDP-Data Warehouse dashboard, as shown in the dashboard below:
For, HDP, this URL is available from within Ambari, the Hive service summary, shown below.
2) Query Parsing and Access Control
Once the query has been issued by the client, and received by Hive Server 2 (HS2), it will be parsed and validated against ranger policies to enforce access control, based on the user’s authorization to the data queried. For this, HS2 references the Hive Ranger Plugin (hosted on the HS2 server) to determine whether the authenticated user has permissions to the columns, tables etc. referenced in the query
1) If the user does not have permissions to some of the columns, etc., the query is rewritten to strip the unauthorized resources, and preserve the authorized resources. For example, If an authenticated user ran the simple query:
SELECT FirstName, LastName, SSN, CC FROM customer
And they were unauthorized to access the SSN and CC columns, their query would be be rewritten as
SELECT FirstName, LastName, FROM customer
If the user has permissions for all the columns, there is no need to rewrite the query.
3) Query Planning and Cost Based Optimization
Once the query is parsed, a logical query plan is generated, for use by the query execution engine (in this case, the engine is either Tez or the traditional Map Reduce engine). Hive performs both logical and physical optimizations, including partition pruning, projection pruning, and predicate pushdown. Hive uses Apache Calcite, to provide Cost Based Optimizations (CBO)
Cost Based Optimization (CBO)
Query performance can be improved through the use of Cost Based Optimization (CBO) techniques, including join reordering. Knowing the number of rows in each table, and the cardinality of columns, the optimizer can order joins in a way to minimize the total number of rows joined. The simplest example of this is to first run tasks that produce the smallest number of rows on which a join will be performed. This can reduce the number of Tez (map/reduce) tasks that need to be spawned for a given query.
In order for Cost Based Optimization (Calcite) to be used, column level stats should be enabled in Hive. This can be achieved through the ANALYZE TABLE statement for existing tables, and by setting three configuration parameters in Hive (detailed below):
hive.stats.autogather for collection of table-level statistics. hive.stats.fetch.column.stats for collection of column-level statistics hive.compute.query.using.stats to use statistics when generating query plans
Further details on the configuration can be found here:
IN CDP Public Cloud, CBO is enabled by default, and the above configurations are configured as true (with the option of disabling).
Column level stats are stored in the HMS (Hive meta store) which is is backed by RDBMS such as MySQL or PostgreSQL. (Previous HBase backed Metastore support is discontinued in Hive 3).
Once a query plan has been generated the plan is delegated to Tez (or the traditional Map Reduce execution engine) for execution.
4) Query Coordination with Map Reduce, or Tez:
Map Reduce and Tez are distributed execution frameworks created for Hive (and other projects), which can take a query plan, and coordinate the execution of that plan across different (map reduce) tasks. These tasks can be represented as a directed acyclic graph (DAG), as shown below.
The query coordination is performed by the application master (AM), and this is the case both for queries executed with the traditional Hive map reduce engine, and with the Tez engine.
Predicate (and projection) pushdown:
Once a query plan is generated, that plan is delegated to the Application Master (AM), for coordination of execution. Based on the query plan, the AM must first identify the folders in which the underlying ORC files are, relating to the query. Any predicates in the query on the partitioned column, will limit the partitions, and therefore folders which Tez needs to read from. For the remaining partitions, the AM, must next request (from NameNode), an enumeration of the ORC files in those folders, and the block locations for the footers of those ORC files.
ORC footers contain file and stripe level statistics which the AM can use to determine which stripes, need to be read by mappers for each ORC file. Min, Max, Null, statistics, and bloom filters can be used to eliminate unnecessary stripe reads, based on predicates. This is the first mechanism by which predicate pushdown is performed.
Further elimination of disk IO can be achieved by eliminating the read of unnecessary columns. For this, the SELECT clause is used to determine which columns need to be read (projection pushdown).
Predicate and projection pushdown improves query performance, by reducing disk scans and reducing the number of mappers required to execute a query. By avoiding full table scans, and avoiding the need to read all columns, single mapper can perform the task of many mappers. To provide a simple example, consider the query
SELECT first_name last_name WHERE zip code = 94611, and customer_segment='champion' and lifestyle='active' and household='empty_nester'
run against a denormalized customer table with 20 columns.
Applying projection pushdown (only reading the streams for first and last name) will reduce the disk reads from 20 columns (streams) to 2 columns, per stripe.
Applying predicate pushdown with bloom filters, will reduce disk reads for all stripes for which there are no customers who meet all of the criteria of being in the 94611 zip code, and being empty nesters in the champion customer segment. If these predicates eliminates 50% of the stripe reads, then only half the mappers are required. The combination of projection and predicate pushdown, will allow a single mapper to perform the work of 10 mappers.
The impact of predicate and projection pushdown is significant both for standard HDD’s as well as for NVMe’s, in the traditional model of local storage. This raises the question as to how predicate and projection pushdown work in the context of cloud based object storage. In this instance because Tez can apply offsets to file reads in storage layers such as S3, predicate pushdown is possible for these storage layers also.
To summarize, it is the Tez AM, which performs the first step in predicate pushdown, by determining which stripes must be read, based on ORC file footer statistics. Based on this, the Tez AM delegates the stripe reads to mapper tasks.
More detail on the ORC file format can be found at the links below:
Mappers are responsible for executing the stripe reads. They retrieve the stripe footer for each of the stripes they will read, which contains the offsets for the column data they must read in each of the stripes. Column data is stored as streams in ORC file format, where one column (e.g. for first name) will be written first in the stripe, followed by the next column, etc. In uncompressed format this might look like:
Stripe Index (Header) ----------------------------- Andrew, John, Sandy, Steve Miller, Smith, White, Carpenter ------------------------------ Stripe Footer
In reality there are multiple streams per column, both for the indexes and the data. These differ according to data type, but generally include at a minimum a stream indicating whether a value is present, and its value. At the index, a potential bloom filter stream, can also be present. Leveraging index streams can allow further predicate pushdown, at the level of the mappers. Statistics data is also present for strides (groupings of 10,000 rows), which further offers additional predicate pushdown. Finally, it's worth bearing in mind, that column data is compressed, based on the data type of the column, which further reduces the IO overhead.
Tez vs. Map Reduce
Tez offers performance gains over traditional Hive on Map Reduce, through the elimination of disk IO between tasks in the query execution DAG. With the traditional Map Reduce execution engine, the output of each task is written to storage, and then read from storage by downstream tasks. Tez avoids this by facilitating access to shared memory across tasks, including RDMA (where relevant).
The ability to share memory between tasks eliminates improves data throughput by several orders of magnitude for tasks located on the same node. Current DDR4 memory can transfer data at 90 GB/s, vs even locally attached storage such as local NVMe’s at 3.6GB/s and HDD’s at 120MB/s.
Tez DAG tasks will be distributed across many nodes in a cluster, so data throughput gains, are moderated because not all tasks requiring shared memory will be running on the same nodes. This is partially mitigated by support for RDMA.
Additional optimizations in Tez:
Tez further optimizes query execution by defining the DAG at runtime. The DAG is generated based on block locations, and the location of YARN containers allocated by Resource Manager. Finally, Tez makes possible the reuse of containers, across tasks and queries, avoiding the overhead of container spin up. There are many additional optimizations performed by Tez, the details for which can be found here: