Member since
09-05-2017
2
Posts
0
Kudos Received
0
Solutions
09-12-2018
11:22 PM
Thank you @jcamachorodriguez for your reply and for creating HIVE-17668. I'm glad to know that I'm not alone
:) I
will try to analyze the log files of HS2 in order to dig the CBO problem. Although
I've also arrived to the same conclusion that exposing the window function to
the outer most query may alter the results, I'm still not able to figure out an
example where pushing the
predicates changes the results
depending if a window function is present in the select statement or not... If
you have any example showing this behavior, I would be grateful. Concerning
FIRST_VALUE, according to my understanding, it will not respond to my needs: It
will not filter the records. If I have 2 versions of the same record id, using
FIRST_VALUE (on all the columns) will return both records with correct values and
then I would use a distinct. I'm not sure it will be more efficient than a
ROW_NUMBER + filter, correct me if I'm wrong... Regards, Christian
... View more
09-12-2018
11:22 PM
Hello,
I am facing a problem with predicates pushdowns. It is not working while adding a window function in the selected columns.
My question is similar to
this question. It is said that is has been resolved in hive 2.1 (HIVE-12808). I have tested with hive 2.1 and still getting the same problem. Hereunder, you'll see how I am facing this issue.
I'm using TEZ engine and I've set all the below variables before executing my queries.
set hive.cbo.enable=true;
set hive.vectorized.execution.enabled = true;
set hive.vectorized.execution.reduce.enabled = true;
set hive.compute.query.using.stats=true;
set hive.stats.fetch.column.stats=true;
set hive.stats.fetch.partition.stats=true;
I've created two tables (
t & u), stored as ORC, inserted some data in them and computed statistics on all columns.
DROP TABLE IF EXISTS t PURGE;
CREATE TABLE t (
c1 string,
c2 INT,
c3 string,
c4 string
)
stored AS orc ;
INSERT INTO t VALUES
('1',0, '1', '2016-01-01'),
('2',1, '1', '2016-01-01'),
('3',2, '2', '2016-01-02'),
('4',3, '2', '2016-01-02'),
('1',0, '1', '2016-01-01'),
('2',1, '1', '2016-01-01'),
('3',2, '2', '2016-01-02'),
('4',3, '2', '2016-01-02'),
('1',0, '1', '2016-01-01'),
('2',2, '1', '2016-01-01'),
('3',4, '2', '2016-01-02'),
('4',6, '2', '2016-01-02'),
('1',0, '1', '2016-01-01'),
('2',1, '1', '2016-01-01'),
('3',2, '2', '2016-01-02'),
('4',3, '2', '2016-01-02'),
('1',0, '1', '2016-01-01'),
('2',2, '1', '2016-01-01'),
('3',4, '2', '2016-01-02'),
('4',6, '2', '2016-01-02');
ANALYZE TABLE t COMPUTE STATISTICS FOR COLUMNS;
DROP TABLE IF EXISTS u PURGE ;
CREATE TABLE u (
d1 string,
d2 INT,
d3 string,
d4 string
)
stored AS orc ;
INSERT INTO u
VALUES('1', 0, '1', '2016-01-01'),
('2', 1, '1', '2016-01-01'),
('3', 2, '2', '2016-01-02'),
('4', 3, '2', '2016-01-02');
INSERT INTO U
VALUES('1', 4, '1', '2016-01-01'),
('2', 5, '1', '2016-01-01'),
('3', 6, '2', '2016-01-02'),
('4', 7, '2', '2016-01-02');
ANALYZE TABLE u COMPUTE STATISTICS FOR COLUMNS;
Table
t contains 20 rows and Table u contains 8 rows.
When I execute the below query
(Query1):
EXPLAIN
SELECT *
FROM
(
SELECT c1,c2,c3,c4,d1,d2,d3,d4
FROM
(
SELECT *
FROM
(
SELECT *
FROM (
SELECT *
FROM t
) AS T1
) AS T2
INNER JOIN u ON d1=c1
) AS T3
) AS T4
WHERE c1 = 4 AND d2 = 7 ;
Predicates are correctly pushed till the inner sub-queries as shown in the execution plan below
Plan not optimized by CBO.
Vertex dependency in root stage
Map 1 <- Map 2 (BROADCAST_EDGE)
Stage-0
Fetch Operator
limit:-1
Stage-1
Map 1 vectorized
File Output Operator [FS_569514]
compressed:true
Statistics:Num rows: 4 Data size: 2128 Basic stats: COMPLETE Column stats: COMPLETE
table:{"input format:":"org.apache.hadoop.mapred.TextInputFormat","output format:":"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat","serde:":"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"}
Select Operator [OP_569513]
outputColumnNames:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7"]
Statistics:Num rows: 4 Data size: 2128 Basic stats: COMPLETE Column stats: COMPLETE
Map Join Operator [MAPJOIN_569512]
| condition map:[{"":"Inner Join 0 to 1"}]
| HybridGraceHashJoin:true
| keys:{"Map 2":"d1 (type: string)","Map 1":"_col0 (type: string)"}
| outputColumnNames:["_col0","_col1","_col2","_col3","_col4","_col6","_col7"]
| Statistics:Num rows: 4 Data size: 2128 Basic stats: COMPLETE Column stats: COMPLETE
|<-Map 2 [BROADCAST_EDGE] vectorized
| Reduce Output Operator [RS_569509]
| key expressions:d1 (type: string)
| Map-reduce partition columns:d1 (type: string)
| sort order:+
| Statistics:Num rows: 1 Data size: 268 Basic stats: COMPLETE Column stats: COMPLETE
| value expressions:d3 (type: string), d4 (type: string)
| Filter Operator [FIL_569508]
| predicate:((d1 is not null and (d2 = 7)) and (d1 = 4)) (type: boolean)
| Statistics:Num rows: 1 Data size: 268 Basic stats: COMPLETE Column stats: COMPLETE
| TableScan [TS_569491]
| alias:u
| Statistics:Num rows: 8 Data size: 2144 Basic stats: COMPLETE Column stats: COMPLETE
|<-Select Operator [OP_569511]
outputColumnNames:["_col0","_col1","_col2","_col3"]
Statistics:Num rows: 4 Data size: 1072 Basic stats: COMPLETE Column stats: COMPLETE
Filter Operator [FIL_569510]
predicate:(c1 = 4) (type: boolean)
Statistics:Num rows: 4 Data size: 1072 Basic stats: COMPLETE Column stats: COMPLETE
TableScan [TS_569488]
alias:t
Statistics:Num rows: 20 Data size: 5360 Basic stats: COMPLETE Column stats: COMPLETE
The first weird behavior is: based on the filter predicate of table
t (predicate:(c1 = 4)) the filter operator in the execution plan finds 4 instead of 5 available rows in the table t. This is not my main problem; the returned result set contains the correct number of rows: 5.
My main problem is once I modify my query by
adding window functions to the select statement, the predicates are not pushed down to sub-queries.
If I execute the slightly modified query below; I've
only added ROW_NUMBER() window function to the select statement as shown below (Query2):
EXPLAIN
SELECT *
FROM
(
SELECT c1,c2,c3,c4,d1,d2,d3,d4
FROM
(
SELECT * , ROW_NUMBER() OVER(PARTITION BY d1 ORDER BY d2 DESC) AS rwnbU
FROM
(
SELECT *
FROM (
SELECT *, ROW_NUMBER() OVER(PARTITION BY c1, c2 ORDER BY c3 DESC) AS rwnbT
FROM t
) AS T1
) AS T2
INNER JOIN u ON d1=c1
) AS T3
) AS T4
WHERE c1 = 4 AND d2 = 7 ;
Predicates are NOT correctly pushed to the inner sub-queries. Tables t & u are fully scanned and fetched, a MAP JOIN is done, the filter is applied at the end using the predicates, as show in the result of the execution plan below.
Plan not optimized by CBO.
Vertex dependency in root stage
Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (BROADCAST_EDGE)
Reducer 3 <- Reducer 2 (SIMPLE_EDGE)
Stage-0
Fetch Operator
limit:-1
Stage-1
Reducer 3
File Output Operator [FS_569705]
compressed:true
Statistics:Num rows: 5 Data size: 2680 Basic stats: COMPLETE Column stats: COMPLETE
table:{"input format:":"org.apache.hadoop.mapred.TextInputFormat","output format:":"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat","serde:":"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"}
Select Operator [SEL_569700]
outputColumnNames:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7"]
Statistics:Num rows: 5 Data size: 2680 Basic stats: COMPLETE Column stats: COMPLETE
Filter Operator [FIL_569708]
predicate:((_col6 = 7) and (_col0 = 4)) (type: boolean)
Statistics:Num rows: 5 Data size: 2680 Basic stats: COMPLETE Column stats: COMPLETE
PTF Operator [PTF_569699]
Function definitions:[{"Input definition":{"type:":"WINDOWING"}},{"order by:":"_col6(DESC)","name:":"windowingtablefunction","partition by:":"_col5"}]
Statistics:Num rows: 32 Data size: 17152 Basic stats: COMPLETE Column stats: COMPLETE
Select Operator [SEL_569698]
| outputColumnNames:["_col0","_col1","_col2","_col3","_col5","_col6","_col7","_col8"]
| Statistics:Num rows: 32 Data size: 17152 Basic stats: COMPLETE Column stats: COMPLETE
|<-Reducer 2 [SIMPLE_EDGE]
Reduce Output Operator [RS_569697]
key expressions:_col5 (type: string), _col6 (type: int)
Map-reduce partition columns:_col5 (type: string)
sort order:+-
Statistics:Num rows: 32 Data size: 17152 Basic stats: COMPLETE Column stats: COMPLETE
value expressions:_col0 (type: string), _col1 (type: int), _col2 (type: string), _col3 (type: string), _col7 (type: string), _col8 (type: string)
Map Join Operator [MAPJOIN_569711]
| condition map:[{"":"Inner Join 0 to 1"}]
| HybridGraceHashJoin:true
| keys:{"Reducer 2":"_col0 (type: string)","Map 4":"d1 (type: string)"}
| outputColumnNames:["_col0","_col1","_col2","_col3","_col5","_col6","_col7","_col8"]
| Statistics:Num rows: 32 Data size: 17152 Basic stats: COMPLETE Column stats: COMPLETE
|<-Map 4 [BROADCAST_EDGE] vectorized
| Reduce Output Operator [RS_569715]
| key expressions:d1 (type: string)
| Map-reduce partition columns:d1 (type: string)
| sort order:+
| Statistics:Num rows: 8 Data size: 2144 Basic stats: COMPLETE Column stats: COMPLETE
| value expressions:d2 (type: int), d3 (type: string), d4 (type: string)
| Filter Operator [FIL_569714]
| predicate:d1 is not null (type: boolean)
| Statistics:Num rows: 8 Data size: 2144 Basic stats: COMPLETE Column stats: COMPLETE
| TableScan [TS_569691]
| alias:u
| Statistics:Num rows: 8 Data size: 2144 Basic stats: COMPLETE Column stats: COMPLETE
|<-Filter Operator [FIL_569709]
predicate:_col0 is not null (type: boolean)
Statistics:Num rows: 20 Data size: 5360 Basic stats: COMPLETE Column stats: COMPLETE
PTF Operator [PTF_569687]
Function definitions:[{"Input definition":{"type:":"WINDOWING"}},{"order by:":"_col2(DESC)","name:":"windowingtablefunction","partition by:":"_col0, _col1"}]
Statistics:Num rows: 20 Data size: 5360 Basic stats: COMPLETE Column stats: COMPLETE
Select Operator [SEL_569686]
| outputColumnNames:["_col0","_col1","_col2","_col3"]
| Statistics:Num rows: 20 Data size: 5360 Basic stats: COMPLETE Column stats: COMPLETE
|<-Map 1 [SIMPLE_EDGE] vectorized
Reduce Output Operator [RS_569713]
key expressions:c1 (type: string), c2 (type: int), c3 (type: string)
Map-reduce partition columns:c1 (type: string), c2 (type: int)
sort order:++-
Statistics:Num rows: 20 Data size: 5360 Basic stats: COMPLETE Column stats: COMPLETE
value expressions:c4 (type: string)
TableScan [TS_569684]
alias:t
Statistics:Num rows: 20 Data size: 5360 Basic stats: COMPLETE Column stats: COMPLETE
Another weird behavior in the plan: table
t has 20 rows from which 5 rows have c1=4 and table u has 8 rows from which 1 row has (d2 = 7) and (d1 = 4), the MAP JOIN calculates 32 rows...
From my understandings, the window function is applied on the filtered result, what is specified in the WHERE clause, and not on the whole table. Thus predicates should be pushed down till the inner sub-query as in
Query1 regardless if a window function is specified or not.
I've tried "manually" setting the predicates in the query as shown below (
Query3)
EXPLAIN
SELECT *
FROM
(
SELECT c1,c2,c3,c4,d1,d2,d3,d4
FROM
(
SELECT * , ROW_NUMBER() OVER(PARTITION BY d1 ORDER BY d2 DESC) AS rwnbU
FROM
(
SELECT *
FROM (
SELECT *, ROW_NUMBER() OVER(PARTITION BY c1, c2 ORDER BY c3 DESC) AS rwnbT
FROM t
WHERE c1=4
) AS T1
) AS T2
INNER JOIN u ON d1=c1
WHERE d2=7 and d1=4
) AS T3
) AS T4
WHERE c1 = 4 AND d2 = 7 ;
The execution plan is as expected: similar to the one of
Query1. Both tables are filtered out while fetched, even though a window function is specified in the select statement (similar to Query2), the join is applied on the filtered results and not before. I still don't understand why 4 rows are fetched from t and not 5, knowing that at the end the query returns 5 rows...
Here is the execution plan for
Query3
Plan not optimized by CBO.
Vertex dependency in root stage
Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (BROADCAST_EDGE)
Reducer 3 <- Reducer 2 (SIMPLE_EDGE)
Stage-0
Fetch Operator
limit:-1
Stage-1
Reducer 3
File Output Operator [FS_569841]
compressed:true
Statistics:Num rows: 4 Data size: 2144 Basic stats: COMPLETE Column stats: COMPLETE
table:{"input format:":"org.apache.hadoop.mapred.TextInputFormat","output format:":"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat","serde:":"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"}
Select Operator [SEL_569836]
outputColumnNames:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7"]
Statistics:Num rows: 4 Data size: 2144 Basic stats: COMPLETE Column stats: COMPLETE
Filter Operator [FIL_569844]
predicate:((_col6 = 7) and (_col0 = 4)) (type: boolean)
Statistics:Num rows: 4 Data size: 2144 Basic stats: COMPLETE Column stats: COMPLETE
PTF Operator [PTF_569835]
Function definitions:[{"Input definition":{"type:":"WINDOWING"}},{"order by:":"_col6(DESC)","name:":"windowingtablefunction","partition by:":"_col5"}]
Statistics:Num rows: 4 Data size: 2144 Basic stats: COMPLETE Column stats: COMPLETE
Select Operator [SEL_569834]
| outputColumnNames:["_col0","_col1","_col2","_col3","_col5","_col6","_col7","_col8"]
| Statistics:Num rows: 4 Data size: 2144 Basic stats: COMPLETE Column stats: COMPLETE
|<-Reducer 2 [SIMPLE_EDGE]
Reduce Output Operator [RS_569833]
key expressions:_col5 (type: string), 7 (type: int)
Map-reduce partition columns:_col5 (type: string)
sort order:+-
Statistics:Num rows: 4 Data size: 2128 Basic stats: COMPLETE Column stats: COMPLETE
value expressions:_col0 (type: string), _col1 (type: int), _col2 (type: string), _col3 (type: string), _col7 (type: string), _col8 (type: string)
Map Join Operator [MAPJOIN_569848]
| condition map:[{"":"Inner Join 0 to 1"}]
| HybridGraceHashJoin:true
| keys:{"Reducer 2":"_col0 (type: string)","Map 4":"d1 (type: string)"}
| outputColumnNames:["_col0","_col1","_col2","_col3","_col5","_col7","_col8"]
| Statistics:Num rows: 4 Data size: 2128 Basic stats: COMPLETE Column stats: COMPLETE
|<-Map 4 [BROADCAST_EDGE] vectorized
| Reduce Output Operator [RS_569853]
| key expressions:d1 (type: string)
| Map-reduce partition columns:d1 (type: string)
| sort order:+
| Statistics:Num rows: 1 Data size: 268 Basic stats: COMPLETE Column stats: COMPLETE
| value expressions:d3 (type: string), d4 (type: string)
| Filter Operator [FIL_569852]
| predicate:((d1 is not null and (d2 = 7)) and (d1 = 4)) (type: boolean)
| Statistics:Num rows: 1 Data size: 268 Basic stats: COMPLETE Column stats: COMPLETE
| TableScan [TS_569826]
| alias:u
| Statistics:Num rows: 8 Data size: 2144 Basic stats: COMPLETE Column stats: COMPLETE
|<-Filter Operator [FIL_569845]
predicate:(_col0 = 4) (type: boolean)
Statistics:Num rows: 4 Data size: 1072 Basic stats: COMPLETE Column stats: COMPLETE
PTF Operator [PTF_569822]
Function definitions:[{"Input definition":{"type:":"WINDOWING"}},{"order by:":"_col2(DESC)","name:":"windowingtablefunction","partition by:":"_col0, _col1"}]
Statistics:Num rows: 4 Data size: 1072 Basic stats: COMPLETE Column stats: COMPLETE
Select Operator [SEL_569821]
| outputColumnNames:["_col0","_col1","_col2","_col3"]
| Statistics:Num rows: 4 Data size: 1072 Basic stats: COMPLETE Column stats: COMPLETE
|<-Map 1 [SIMPLE_EDGE] vectorized
Reduce Output Operator [RS_569851]
key expressions:c1 (type: string), c2 (type: int), c3 (type: string)
Map-reduce partition columns:c1 (type: string), c2 (type: int)
sort order:++-
Statistics:Num rows: 4 Data size: 1072 Basic stats: COMPLETE Column stats: COMPLETE
value expressions:c4 (type: string)
Filter Operator [FIL_569850]
predicate:(c1 = 4) (type: boolean)
Statistics:Num rows: 4 Data size: 1072 Basic stats: COMPLETE Column stats: COMPLETE
TableScan [TS_569818]
alias:t
Statistics:Num rows: 20 Data size: 5360 Basic stats: COMPLETE Column stats: COMPLETE
This behavior is not limited to the ROW_NUMBER window function, I've tested it with other window functions RANK, COUNT, MAX, DENSE_RANK... and still the same behavior.
I've tried the same example on MS SQL Server, in order to check if a different engine has the same behavior. Predicates are well pushed down even though window functions are specified in select statement (I've used the same tables and queries). I've got a similar behavior once I've replaced the "SELECT c1,c2,c3,c4,d1,d2,d3,d4" by a "SELECT * ": Exposing the window function's to the outer select changed the execution plan of SQL Server and forced the compiler to do full table scans & fetches. In my case in Hive, I'm listing only the needed columns.
Now that I've exposed my problems, let me tell you about my needs, maybe there is a better way to achieve it.
Tables
t & u can contain multiple versions of each record. I want to expose to the end user a View that retrieves the latest available record from each table only if the latest version of the record in table t has c2=2. Version is determined by the highest value available of columns c3 and d2 of tables t and u respectively. I've simulated the View by an outer select and wrote the below query (Query4😞
CREATE VIEW Query4 as
SELECT *
FROM
(
SELECT c1,c2,c3,c4,d1,d2,d3,d4
FROM
(
SELECT * , ROW_NUMBER() OVER(PARTITION BY d1 ORDER BY d2 DESC) AS rwnbU
FROM
(
SELECT *
FROM (
SELECT * , ROW_NUMBER() OVER(PARTITION BY c1, c2 ORDER BY c3 DESC) AS rwnbT
FROM t
) AS T1
WHERE c2=2 and rwnbT = 1
) AS T2
INNER JOIN u ON d1=c1
) AS T3
WHERE rwnbU =1
) AS T4;
SELECT *
FROM Query4
WHERE c1 = 2 AND d3 = 1 ;
Adding ROW_NUMBER function prevented the predicates "WHERE c1 = 2 AND d3 = 1" to get to the inner queries, and thus led Hive to fetch & load the whole tables.
In my example, it is not a big deal.
Real tables contain around 3 billion of records each. Tables are partitioned and stored as ORC. Through the view, predicates are not pushed, the whole tables are fetched and loaded; the filter is applied at the final step. Querying the view takes around 20 minutes and consumes a lot of the cluster's resources, while "manually" pushing the predicates (as done in Query3), the result is returned within few seconds...
Why the plan is not optimized by CBO although I'm setting set hive.cbo.enable=true;. Is this an Optimizer bug?
Why the number of rows in the plan is not matching the number of rows available in the table?
Does anyone know how to bypass this problem? Is there an alternative way of writing the view without blocking the predicates pushdowns while keeping the query performant? (I've used window functions based on the recommendations of this article
https://hortonworks.com/blog/5-ways-make-hive-queries-run-faster/)
Thanks & regards,
Chris
... View more
Labels:
- Labels:
-
Apache Hive