Created 09-12-2018 11:22 PM
Hello,
I am facing a problem with predicates pushdowns. It is not working while adding a window function in the selected columns.
My question is similar to this question. It is said that is has been resolved in hive 2.1 (HIVE-12808). I have tested with hive 2.1 and still getting the same problem. Hereunder, you'll see how I am facing this issue.
I'm using TEZ engine and I've set all the below variables before executing my queries.
set hive.cbo.enable=true; set hive.vectorized.execution.enabled = true; set hive.vectorized.execution.reduce.enabled = true; set hive.compute.query.using.stats=true; set hive.stats.fetch.column.stats=true; set hive.stats.fetch.partition.stats=true;
I've created two tables ( t & u), stored as ORC, inserted some data in them and computed statistics on all columns.
DROP TABLE IF EXISTS t PURGE; CREATE TABLE t ( c1 string, c2 INT, c3 string, c4 string ) stored AS orc ; INSERT INTO t VALUES ('1',0, '1', '2016-01-01'), ('2',1, '1', '2016-01-01'), ('3',2, '2', '2016-01-02'), ('4',3, '2', '2016-01-02'), ('1',0, '1', '2016-01-01'), ('2',1, '1', '2016-01-01'), ('3',2, '2', '2016-01-02'), ('4',3, '2', '2016-01-02'), ('1',0, '1', '2016-01-01'), ('2',2, '1', '2016-01-01'), ('3',4, '2', '2016-01-02'), ('4',6, '2', '2016-01-02'), ('1',0, '1', '2016-01-01'), ('2',1, '1', '2016-01-01'), ('3',2, '2', '2016-01-02'), ('4',3, '2', '2016-01-02'), ('1',0, '1', '2016-01-01'), ('2',2, '1', '2016-01-01'), ('3',4, '2', '2016-01-02'), ('4',6, '2', '2016-01-02'); ANALYZE TABLE t COMPUTE STATISTICS FOR COLUMNS; DROP TABLE IF EXISTS u PURGE ; CREATE TABLE u ( d1 string, d2 INT, d3 string, d4 string ) stored AS orc ; INSERT INTO u VALUES('1', 0, '1', '2016-01-01'), ('2', 1, '1', '2016-01-01'), ('3', 2, '2', '2016-01-02'), ('4', 3, '2', '2016-01-02'); INSERT INTO U VALUES('1', 4, '1', '2016-01-01'), ('2', 5, '1', '2016-01-01'), ('3', 6, '2', '2016-01-02'), ('4', 7, '2', '2016-01-02'); ANALYZE TABLE u COMPUTE STATISTICS FOR COLUMNS;
Table t contains 20 rows and Table u contains 8 rows.
When I execute the below query (Query1):
EXPLAIN SELECT * FROM ( SELECT c1,c2,c3,c4,d1,d2,d3,d4 FROM ( SELECT * FROM ( SELECT * FROM ( SELECT * FROM t ) AS T1 ) AS T2 INNER JOIN u ON d1=c1 ) AS T3 ) AS T4 WHERE c1 = 4 AND d2 = 7 ;
Predicates are correctly pushed till the inner sub-queries as shown in the execution plan below
Plan not optimized by CBO. Vertex dependency in root stage Map 1 <- Map 2 (BROADCAST_EDGE) Stage-0 Fetch Operator limit:-1 Stage-1 Map 1 vectorized File Output Operator [FS_569514] compressed:true Statistics:Num rows: 4 Data size: 2128 Basic stats: COMPLETE Column stats: COMPLETE table:{"input format:":"org.apache.hadoop.mapred.TextInputFormat","output format:":"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat","serde:":"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"} Select Operator [OP_569513] outputColumnNames:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7"] Statistics:Num rows: 4 Data size: 2128 Basic stats: COMPLETE Column stats: COMPLETE Map Join Operator [MAPJOIN_569512] | condition map:[{"":"Inner Join 0 to 1"}] | HybridGraceHashJoin:true | keys:{"Map 2":"d1 (type: string)","Map 1":"_col0 (type: string)"} | outputColumnNames:["_col0","_col1","_col2","_col3","_col4","_col6","_col7"] | Statistics:Num rows: 4 Data size: 2128 Basic stats: COMPLETE Column stats: COMPLETE |<-Map 2 [BROADCAST_EDGE] vectorized | Reduce Output Operator [RS_569509] | key expressions:d1 (type: string) | Map-reduce partition columns:d1 (type: string) | sort order:+ | Statistics:Num rows: 1 Data size: 268 Basic stats: COMPLETE Column stats: COMPLETE | value expressions:d3 (type: string), d4 (type: string) | Filter Operator [FIL_569508] | predicate:((d1 is not null and (d2 = 7)) and (d1 = 4)) (type: boolean) | Statistics:Num rows: 1 Data size: 268 Basic stats: COMPLETE Column stats: COMPLETE | TableScan [TS_569491] | alias:u | Statistics:Num rows: 8 Data size: 2144 Basic stats: COMPLETE Column stats: COMPLETE |<-Select Operator [OP_569511] outputColumnNames:["_col0","_col1","_col2","_col3"] Statistics:Num rows: 4 Data size: 1072 Basic stats: COMPLETE Column stats: COMPLETE Filter Operator [FIL_569510] predicate:(c1 = 4) (type: boolean) Statistics:Num rows: 4 Data size: 1072 Basic stats: COMPLETE Column stats: COMPLETE TableScan [TS_569488] alias:t Statistics:Num rows: 20 Data size: 5360 Basic stats: COMPLETE Column stats: COMPLETE
The first weird behavior is: based on the filter predicate of table t (predicate:(c1 = 4)) the filter operator in the execution plan finds 4 instead of 5 available rows in the table t. This is not my main problem; the returned result set contains the correct number of rows: 5.
My main problem is once I modify my query by adding window functions to the select statement, the predicates are not pushed down to sub-queries.
If I execute the slightly modified query below; I've only added ROW_NUMBER() window function to the select statement as shown below (Query2):
EXPLAIN SELECT * FROM ( SELECT c1,c2,c3,c4,d1,d2,d3,d4 FROM ( SELECT * , ROW_NUMBER() OVER(PARTITION BY d1 ORDER BY d2 DESC) AS rwnbU FROM ( SELECT * FROM ( SELECT *, ROW_NUMBER() OVER(PARTITION BY c1, c2 ORDER BY c3 DESC) AS rwnbT FROM t ) AS T1 ) AS T2 INNER JOIN u ON d1=c1 ) AS T3 ) AS T4 WHERE c1 = 4 AND d2 = 7 ;
Predicates are NOT correctly pushed to the inner sub-queries. Tables t & u are fully scanned and fetched, a MAP JOIN is done, the filter is applied at the end using the predicates, as show in the result of the execution plan below.
Plan not optimized by CBO. Vertex dependency in root stage Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (BROADCAST_EDGE) Reducer 3 <- Reducer 2 (SIMPLE_EDGE) Stage-0 Fetch Operator limit:-1 Stage-1 Reducer 3 File Output Operator [FS_569705] compressed:true Statistics:Num rows: 5 Data size: 2680 Basic stats: COMPLETE Column stats: COMPLETE table:{"input format:":"org.apache.hadoop.mapred.TextInputFormat","output format:":"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat","serde:":"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"} Select Operator [SEL_569700] outputColumnNames:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7"] Statistics:Num rows: 5 Data size: 2680 Basic stats: COMPLETE Column stats: COMPLETE Filter Operator [FIL_569708] predicate:((_col6 = 7) and (_col0 = 4)) (type: boolean) Statistics:Num rows: 5 Data size: 2680 Basic stats: COMPLETE Column stats: COMPLETE PTF Operator [PTF_569699] Function definitions:[{"Input definition":{"type:":"WINDOWING"}},{"order by:":"_col6(DESC)","name:":"windowingtablefunction","partition by:":"_col5"}] Statistics:Num rows: 32 Data size: 17152 Basic stats: COMPLETE Column stats: COMPLETE Select Operator [SEL_569698] | outputColumnNames:["_col0","_col1","_col2","_col3","_col5","_col6","_col7","_col8"] | Statistics:Num rows: 32 Data size: 17152 Basic stats: COMPLETE Column stats: COMPLETE |<-Reducer 2 [SIMPLE_EDGE] Reduce Output Operator [RS_569697] key expressions:_col5 (type: string), _col6 (type: int) Map-reduce partition columns:_col5 (type: string) sort order:+- Statistics:Num rows: 32 Data size: 17152 Basic stats: COMPLETE Column stats: COMPLETE value expressions:_col0 (type: string), _col1 (type: int), _col2 (type: string), _col3 (type: string), _col7 (type: string), _col8 (type: string) Map Join Operator [MAPJOIN_569711] | condition map:[{"":"Inner Join 0 to 1"}] | HybridGraceHashJoin:true | keys:{"Reducer 2":"_col0 (type: string)","Map 4":"d1 (type: string)"} | outputColumnNames:["_col0","_col1","_col2","_col3","_col5","_col6","_col7","_col8"] | Statistics:Num rows: 32 Data size: 17152 Basic stats: COMPLETE Column stats: COMPLETE |<-Map 4 [BROADCAST_EDGE] vectorized | Reduce Output Operator [RS_569715] | key expressions:d1 (type: string) | Map-reduce partition columns:d1 (type: string) | sort order:+ | Statistics:Num rows: 8 Data size: 2144 Basic stats: COMPLETE Column stats: COMPLETE | value expressions:d2 (type: int), d3 (type: string), d4 (type: string) | Filter Operator [FIL_569714] | predicate:d1 is not null (type: boolean) | Statistics:Num rows: 8 Data size: 2144 Basic stats: COMPLETE Column stats: COMPLETE | TableScan [TS_569691] | alias:u | Statistics:Num rows: 8 Data size: 2144 Basic stats: COMPLETE Column stats: COMPLETE |<-Filter Operator [FIL_569709] predicate:_col0 is not null (type: boolean) Statistics:Num rows: 20 Data size: 5360 Basic stats: COMPLETE Column stats: COMPLETE PTF Operator [PTF_569687] Function definitions:[{"Input definition":{"type:":"WINDOWING"}},{"order by:":"_col2(DESC)","name:":"windowingtablefunction","partition by:":"_col0, _col1"}] Statistics:Num rows: 20 Data size: 5360 Basic stats: COMPLETE Column stats: COMPLETE Select Operator [SEL_569686] | outputColumnNames:["_col0","_col1","_col2","_col3"] | Statistics:Num rows: 20 Data size: 5360 Basic stats: COMPLETE Column stats: COMPLETE |<-Map 1 [SIMPLE_EDGE] vectorized Reduce Output Operator [RS_569713] key expressions:c1 (type: string), c2 (type: int), c3 (type: string) Map-reduce partition columns:c1 (type: string), c2 (type: int) sort order:++- Statistics:Num rows: 20 Data size: 5360 Basic stats: COMPLETE Column stats: COMPLETE value expressions:c4 (type: string) TableScan [TS_569684] alias:t Statistics:Num rows: 20 Data size: 5360 Basic stats: COMPLETE Column stats: COMPLETE
Another weird behavior in the plan: table t has 20 rows from which 5 rows have c1=4 and table u has 8 rows from which 1 row has (d2 = 7) and (d1 = 4), the MAP JOIN calculates 32 rows...
From my understandings, the window function is applied on the filtered result, what is specified in the WHERE clause, and not on the whole table. Thus predicates should be pushed down till the inner sub-query as in Query1 regardless if a window function is specified or not.
I've tried "manually" setting the predicates in the query as shown below ( Query3)
EXPLAIN SELECT * FROM ( SELECT c1,c2,c3,c4,d1,d2,d3,d4 FROM ( SELECT * , ROW_NUMBER() OVER(PARTITION BY d1 ORDER BY d2 DESC) AS rwnbU FROM ( SELECT * FROM ( SELECT *, ROW_NUMBER() OVER(PARTITION BY c1, c2 ORDER BY c3 DESC) AS rwnbT FROM t WHERE c1=4 ) AS T1 ) AS T2 INNER JOIN u ON d1=c1 WHERE d2=7 and d1=4 ) AS T3 ) AS T4 WHERE c1 = 4 AND d2 = 7 ;
The execution plan is as expected: similar to the one of Query1. Both tables are filtered out while fetched, even though a window function is specified in the select statement (similar to Query2), the join is applied on the filtered results and not before. I still don't understand why 4 rows are fetched from t and not 5, knowing that at the end the query returns 5 rows...
Here is the execution plan for Query3
Plan not optimized by CBO. Vertex dependency in root stage Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (BROADCAST_EDGE) Reducer 3 <- Reducer 2 (SIMPLE_EDGE) Stage-0 Fetch Operator limit:-1 Stage-1 Reducer 3 File Output Operator [FS_569841] compressed:true Statistics:Num rows: 4 Data size: 2144 Basic stats: COMPLETE Column stats: COMPLETE table:{"input format:":"org.apache.hadoop.mapred.TextInputFormat","output format:":"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat","serde:":"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"} Select Operator [SEL_569836] outputColumnNames:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7"] Statistics:Num rows: 4 Data size: 2144 Basic stats: COMPLETE Column stats: COMPLETE Filter Operator [FIL_569844] predicate:((_col6 = 7) and (_col0 = 4)) (type: boolean) Statistics:Num rows: 4 Data size: 2144 Basic stats: COMPLETE Column stats: COMPLETE PTF Operator [PTF_569835] Function definitions:[{"Input definition":{"type:":"WINDOWING"}},{"order by:":"_col6(DESC)","name:":"windowingtablefunction","partition by:":"_col5"}] Statistics:Num rows: 4 Data size: 2144 Basic stats: COMPLETE Column stats: COMPLETE Select Operator [SEL_569834] | outputColumnNames:["_col0","_col1","_col2","_col3","_col5","_col6","_col7","_col8"] | Statistics:Num rows: 4 Data size: 2144 Basic stats: COMPLETE Column stats: COMPLETE |<-Reducer 2 [SIMPLE_EDGE] Reduce Output Operator [RS_569833] key expressions:_col5 (type: string), 7 (type: int) Map-reduce partition columns:_col5 (type: string) sort order:+- Statistics:Num rows: 4 Data size: 2128 Basic stats: COMPLETE Column stats: COMPLETE value expressions:_col0 (type: string), _col1 (type: int), _col2 (type: string), _col3 (type: string), _col7 (type: string), _col8 (type: string) Map Join Operator [MAPJOIN_569848] | condition map:[{"":"Inner Join 0 to 1"}] | HybridGraceHashJoin:true | keys:{"Reducer 2":"_col0 (type: string)","Map 4":"d1 (type: string)"} | outputColumnNames:["_col0","_col1","_col2","_col3","_col5","_col7","_col8"] | Statistics:Num rows: 4 Data size: 2128 Basic stats: COMPLETE Column stats: COMPLETE |<-Map 4 [BROADCAST_EDGE] vectorized | Reduce Output Operator [RS_569853] | key expressions:d1 (type: string) | Map-reduce partition columns:d1 (type: string) | sort order:+ | Statistics:Num rows: 1 Data size: 268 Basic stats: COMPLETE Column stats: COMPLETE | value expressions:d3 (type: string), d4 (type: string) | Filter Operator [FIL_569852] | predicate:((d1 is not null and (d2 = 7)) and (d1 = 4)) (type: boolean) | Statistics:Num rows: 1 Data size: 268 Basic stats: COMPLETE Column stats: COMPLETE | TableScan [TS_569826] | alias:u | Statistics:Num rows: 8 Data size: 2144 Basic stats: COMPLETE Column stats: COMPLETE |<-Filter Operator [FIL_569845] predicate:(_col0 = 4) (type: boolean) Statistics:Num rows: 4 Data size: 1072 Basic stats: COMPLETE Column stats: COMPLETE PTF Operator [PTF_569822] Function definitions:[{"Input definition":{"type:":"WINDOWING"}},{"order by:":"_col2(DESC)","name:":"windowingtablefunction","partition by:":"_col0, _col1"}] Statistics:Num rows: 4 Data size: 1072 Basic stats: COMPLETE Column stats: COMPLETE Select Operator [SEL_569821] | outputColumnNames:["_col0","_col1","_col2","_col3"] | Statistics:Num rows: 4 Data size: 1072 Basic stats: COMPLETE Column stats: COMPLETE |<-Map 1 [SIMPLE_EDGE] vectorized Reduce Output Operator [RS_569851] key expressions:c1 (type: string), c2 (type: int), c3 (type: string) Map-reduce partition columns:c1 (type: string), c2 (type: int) sort order:++- Statistics:Num rows: 4 Data size: 1072 Basic stats: COMPLETE Column stats: COMPLETE value expressions:c4 (type: string) Filter Operator [FIL_569850] predicate:(c1 = 4) (type: boolean) Statistics:Num rows: 4 Data size: 1072 Basic stats: COMPLETE Column stats: COMPLETE TableScan [TS_569818] alias:t Statistics:Num rows: 20 Data size: 5360 Basic stats: COMPLETE Column stats: COMPLETE
This behavior is not limited to the ROW_NUMBER window function, I've tested it with other window functions RANK, COUNT, MAX, DENSE_RANK... and still the same behavior.
I've tried the same example on MS SQL Server, in order to check if a different engine has the same behavior. Predicates are well pushed down even though window functions are specified in select statement (I've used the same tables and queries). I've got a similar behavior once I've replaced the "SELECT c1,c2,c3,c4,d1,d2,d3,d4" by a "SELECT * ": Exposing the window function's to the outer select changed the execution plan of SQL Server and forced the compiler to do full table scans & fetches. In my case in Hive, I'm listing only the needed columns.
Now that I've exposed my problems, let me tell you about my needs, maybe there is a better way to achieve it.
Tables t & u can contain multiple versions of each record. I want to expose to the end user a View that retrieves the latest available record from each table only if the latest version of the record in table t has c2=2. Version is determined by the highest value available of columns c3 and d2 of tables t and u respectively. I've simulated the View by an outer select and wrote the below query (Query4😞
CREATE VIEW Query4 as SELECT * FROM ( SELECT c1,c2,c3,c4,d1,d2,d3,d4 FROM ( SELECT * , ROW_NUMBER() OVER(PARTITION BY d1 ORDER BY d2 DESC) AS rwnbU FROM ( SELECT * FROM ( SELECT * , ROW_NUMBER() OVER(PARTITION BY c1, c2 ORDER BY c3 DESC) AS rwnbT FROM t ) AS T1 WHERE c2=2 and rwnbT = 1 ) AS T2 INNER JOIN u ON d1=c1 ) AS T3 WHERE rwnbU =1 ) AS T4;
SELECT * FROM Query4 WHERE c1 = 2 AND d3 = 1 ;
Adding ROW_NUMBER function prevented the predicates "WHERE c1 = 2 AND d3 = 1" to get to the inner queries, and thus led Hive to fetch & load the whole tables.
In my example, it is not a big deal. Real tables contain around 3 billion of records each. Tables are partitioned and stored as ORC. Through the view, predicates are not pushed, the whole tables are fetched and loaded; the filter is applied at the final step. Querying the view takes around 20 minutes and consumes a lot of the cluster's resources, while "manually" pushing the predicates (as done in Query3), the result is returned within few seconds...
Why the plan is not optimized by CBO although I'm setting set hive.cbo.enable=true;. Is this an Optimizer bug?
Why the number of rows in the plan is not matching the number of rows available in the table?
Does anyone know how to bypass this problem? Is there an alternative way of writing the view without blocking the predicates pushdowns while keeping the query performant? (I've used window functions based on the recommendations of this article https://hortonworks.com/blog/5-ways-make-hive-queries-run-faster/)
Thanks & regards,
Chris
Created 09-12-2018 11:22 PM
@Christian EL HAKIM, thanks for reporting this.
First of all, it seems there is indeed a problem in your environment, since plan is not optimized by CBO as indicated in the explain. If you explore the log files of HS2, you should see the actual reason why CBO is not optimizing the query.
Concerning the mismatch between number of rows in statistics and actual results, the statistics only represent an estimate of the actual data coming out of the operators. Thus, there is nothing wrong there.
In addition, I tried running query2 with CBO in my environment and it seems that for your specific example, CBO is not pushing the predicates either. I have created HIVE-17668 and I have a fix for that (this will be part of HDP-2.6.x). However, note that in your example query2, predicates are not being pushed through windowing functions: in fact, columns created by windowing functions are pruned out from the query since they are not used by the top-outer query, and then filter predicates can be pushed down. That is why if your top-outer query is _select *..._ (query3), Hive and SQLServer do not push the predicates down, since this would create wrong results (e.g. the result coming out of ROW_NUMBER would be different if the filter predicate is pushed). Concerning the semantics of query4, it seems instead of using ROW_NUMBER + filter to get first row, you could use FIRST_VALUE.
Created 09-12-2018 11:22 PM
Thank you @jcamachorodriguez for your reply and for creating HIVE-17668. I'm glad to know that I'm not alone :)
I will try to analyze the log files of HS2 in order to dig the CBO problem.
Although I've also arrived to the same conclusion that exposing the window function to the outer most query may alter the results, I'm still not able to figure out an example where pushing the predicates changes the results depending if a window function is present in the select statement or not... If you have any example showing this behavior, I would be grateful.
Concerning FIRST_VALUE, according to my understanding, it will not respond to my needs: It will not filter the records. If I have 2 versions of the same record id, using FIRST_VALUE (on all the columns) will return both records with correct values and then I would use a distinct. I'm not sure it will be more efficient than a ROW_NUMBER + filter, correct me if I'm wrong...
Regards,
Christian
Created 09-12-2018 11:22 PM
We also faced a similar issue and this approach helped me to push down subquery even having "WITH" clause. This even works in Hive 1.2.1
https://stackoverflow.com/questions/13523049/hive-sql-find-the-latest-record
The limitation is that this is
a) only useful for MAX logic.
b) We can't have the flexibility of having windowing different ORDERS, .ie. col1 desc, col2 asc, col3 desc etc
Thanks,
Sarath