Created 02-08-2016 07:24 AM
I try to join two quite big tables. Both tables are bucketed ORC (but on different columns).
SELECT tk.calday, round(avg(tk.onlineprice),2) mv_price, round(avg(tp.zprice),2) el_price FROM (select calday, onlinePrice, article, marketid, catalogLevel4, city from price.tkonkurent where calday between '2016-01-01' and '2016-01-25' and marketid = 'movideo' and city='Moscow' and cataloglevel4='fridge' and article is not null and onlineprice<20000 and availability='inStock' and collectmethod='full' ) tk join (select calday, material, zprice from price.toprice where calday between '2016-01-01' and '2016-01-25' and zcityc ='7702' and distr_chain='I1' ) tp ON tk.calday=tp.calday and tk.article=tp.material GROUP BY tk.calday order by tk.calday;
The problem is that second mapper stops on the initialization stage when I select more than 20 days (calday between '2016-01-01' and '2016-01-25'). If I select less days it works correctly.
It can stay in pending status for hours. There are no errors in logs. After MAP 1 only one container is running (with AM).
--------------------------------------------------------------------------------
VERTICES STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED
--------------------------------------------------------------------------------
Map 1 .......... SUCCEEDED 75 75 0 0 1 0
Map 5 INITIALIZING -1 0 0 -1 0 0
Reducer 2 INITED 366 0 0 366 0 0
Reducer 3 INITED 174 0 0 174 0 0
Reducer 4 INITED 1 0 0 1 0 0
--------------------------------------------------------------------------------
VERTICES: 01/05 [===>>-----------------------] 12% ELAPSED TIME: 8122.10 s
--------------------------------------------------------------------------------
EXPLAIN PLAN
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 depends on stages: Stage-1
STAGE PLANS:
Stage: Stage-1
Tez
Edges:
Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 5 (SIMPLE_EDGE)
Reducer 3 <- Reducer 2 (SIMPLE_EDGE)
Reducer 4 <- Reducer 3 (SIMPLE_EDGE)
DagName: hive_20160208101616_cf92edfe-7c45-4122-bb4f-5535b1ddf1ca:59
Vertices:
Map 1
Map Operator Tree:
TableScan
alias: toprice
filterExpr: (((zcityc = '7702') and (distr_chain = 'I1')) and material is not null) (type: boolean)
Statistics: Num rows: 520800470 Data size: 558197823476 Basic stats: COMPLETE Column stats: PARTIAL
Filter Operator
predicate: (((zcityc = '7702') and (distr_chain = 'I1')) and material is not null) (type: boolean)
Statistics: Num rows: 65100059 Data size: 11978410856 Basic stats: COMPLETE Column stats: PARTIAL
Select Operator
expressions: calday (type: string), material (type: string), zprice (type: double)
outputColumnNames: _col0, _col1, _col2
Statistics: Num rows: 65100059 Data size: 11978410856 Basic stats: COMPLETE Column stats: PARTIAL
Reduce Output Operator
key expressions: _col0 (type: string), _col1 (type: string)
sort order: ++
Map-reduce partition columns: _col0 (type: string), _col1 (type: string)
Statistics: Num rows: 65100059 Data size: 11978410856 Basic stats: COMPLETE Column stats: PARTIAL
value expressions: _col2 (type: double)
Select Operator
expressions: _col0 (type: string)
outputColumnNames: _col0
Statistics: Num rows: 65100059 Data size: 11978410856 Basic stats: COMPLETE Column stats: PARTIAL
Group By Operator
keys: _col0 (type: string)
mode: hash
outputColumnNames: _col0
Statistics: Num rows: 8724 Data size: 1605216 Basic stats: COMPLETE Column stats: PARTIAL
Dynamic Partitioning Event Operator
Target Input: tkonkurent
Partition key expr: calday
Statistics: Num rows: 8724 Data size: 1605216 Basic stats: COMPLETE Column stats: PARTIAL
Target column: calday
Target Vertex: Map 5
Execution mode: vectorized
Map 5
Map Operator Tree:
TableScan
alias: tkonkurent
filterExpr: (((((((marketid = 'movideo') and (city = 'Moscow')) and (cataloglevel4 = 'fridge')) and article is not null) and (onlineprice < 20000.0)) and (availability = 'inStock')) and (collectmethod = 'full')) (type: boolean)
Statistics: Num rows: 272469616 Data size: 1090024710512 Basic stats: COMPLETE Column stats: PARTIAL
Filter Operator
predicate: (((((((marketid = 'movideo') and (city = 'Moscow')) and (cataloglevel4 = 'fridge')) and article is not null) and (onlineprice < 20000.0)) and (availability = 'inStock')) and (collectmethod = 'full')) (type: boolean) Statistics: Num rows: 1419112 Data size: 261116608 Basic stats: COMPLETE Column stats: PARTIAL
Select Operator
expressions: calday (type: string), onlineprice (type: double), article (type: string)
outputColumnNames: _col0, _col1, _col2
Statistics: Num rows: 1419112 Data size: 261116608 Basic stats: COMPLETE Column stats: PARTIAL
Reduce Output Operator
key expressions: _col0 (type: string), _col2 (type: string)
sort order: ++
Map-reduce partition columns: _col0 (type: string), _col2 (type: string)
Statistics: Num rows: 1419112 Data size: 261116608 Basic stats: COMPLETE Column stats: PARTIAL
value expressions: _col1 (type: double)
Execution mode: vectorized
Reducer 2
Reduce Operator Tree:
Merge Join Operator
condition map:
Inner Join 0 to 1
condition expressions:
0 {KEY.reducesinkkey0} {VALUE._col0}
1 {VALUE._col0}
outputColumnNames: _col0, _col1, _col8
Statistics: Num rows: 11548034365951 Data size: 2124838323334984 Basic stats: COMPLETE Column stats: PARTIAL
Select Operator
expressions: _col0 (type: string), _col1 (type: double), _col8 (type: double)
outputColumnNames: _col0, _col1, _col8
Statistics: Num rows: 11548034365951 Data size: 2124838323334984 Basic stats: COMPLETE Column stats: PARTIAL
Group By Operator
aggregations: avg(_col1), avg(_col8)
keys: _col0 (type: string)
mode: hash
outputColumnNames: _col0, _col1, _col2
Statistics: Num rows: 31662559 Data size: 5825910856 Basic stats: COMPLETE Column stats: PARTIAL
Reduce Output Operator
key expressions: _col0 (type: string)
sort order: +
Map-reduce partition columns: _col0 (type: string)
Statistics: Num rows: 31662559 Data size: 5825910856 Basic stats: COMPLETE Column stats: PARTIAL
value expressions: _col1 (type: struct<count:bigint,sum:double,input:double>), _col2 (type: struct<count:bigint,sum:double,input:double>)
Reducer 3
Reduce Operator Tree:
Group By Operator
aggregations: avg(VALUE._col0), avg(VALUE._col1)
keys: KEY._col0 (type: string)
mode: mergepartial
outputColumnNames: _col0, _col1, _col2
Statistics: Num rows: 1 Data size: 200 Basic stats: COMPLETE Column stats: PARTIAL
Select Operator
expressions: _col0 (type: string), round(_col1, 2) (type: double), round(_col2, 2) (type: double)
outputColumnNames: _col0, _col1, _col2
Statistics: Num rows: 1 Data size: 200 Basic stats: COMPLETE Column stats: PARTIAL
Reduce Output Operator
key expressions: _col0 (type: string)
sort order: +
Statistics: Num rows: 1 Data size: 200 Basic stats: COMPLETE Column stats: PARTIAL
value expressions: _col1 (type: double), _col2 (type: double)
Reducer 4
Reduce Operator Tree:
Select Operator
expressions: KEY.reducesinkkey0 (type: string), VALUE._col0 (type: double), VALUE._col1 (type: double)
outputColumnNames: _col0, _col1, _col2
Statistics: Num rows: 1 Data size: 200 Basic stats: COMPLETE Column stats: PARTIAL
File Output Operator
compressed: false
Statistics: Num rows: 1 Data size: 200 Basic stats: COMPLETE Column stats: PARTIAL
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Execution mode: vectorized
Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
ListSink
HIVE SETTING
hive.tez.container.size=5120 hive.tez.java.opts=-server -Xmx4096m -Djava.net.preferIPv4Stack=true -XX:NewRatio=8 -XX:+UseNUMA -XX:+UseParallelGC -XX:+PrintGCDetails -verbose:gc -XX:+PrintGCTimeStamps mapreduce.map.memory.mb=5120 mapreduce.map.java.opts=-Xmx4096m mapreduce.reduce.memory.mb=8192 mapreduce.reduce.java.opts=-Xmx6554m tez.runtime.io.sort.mb=2047 hive.auto.convert.sortmerge.join=true hive.auto.convert.sortmerge.join.to.mapjoin=false hive.convert.join.bucket.mapjoin.tez=false hive.enforce.sortmergebucketmapjoin=true hive.exec.submit.local.task.via.child=true hive.mapjoin.bucket.cache.size=10000 hive.mapjoin.optimized.hashtable=true hive.optimize.bucketmapjoin=true YARN yarn.nodemanager.resource.memory-mb=56500 yarn.scheduler.minimum-allocation-mb=5120 yarn.scheduler.maximum-allocation-mb=48000 yarn.nodemanager.resource.cpu-vcores=20
I also tried with these settings, but it only helps to avoid failed tasks in MAP 1. Main problem still remains.
SET hive.tez.container.size=20480; SET hive.tez.java.opts=-Xmx16384m;
Created 02-09-2016 10:48 AM
@Benjamin Leonhardi, @Sourygna Luangsay
I solved the problem! I suffered from this bug IndexOutOfBoundsException with RemoveDynamicPruningBySize
(forgot to mention I use Hive 0.14)
With this setting my join works even on the semi-annual period.
set hive.tez.dynamic.partition.pruning=false;
But what about other queries? As I know Dynamic Partition Pruning on TEZ is very good feature. I would not want to disable it globally...
Created 02-09-2016 01:57 PM
Unfortunately not in the near future.