Created 02-08-2016 07:24 AM
I try to join two quite big tables. Both tables are bucketed ORC (but on different columns).
SELECT tk.calday, round(avg(tk.onlineprice),2) mv_price, round(avg(tp.zprice),2) el_price FROM (select calday, onlinePrice, article, marketid, catalogLevel4, city from price.tkonkurent where calday between '2016-01-01' and '2016-01-25' and marketid = 'movideo' and city='Moscow' and cataloglevel4='fridge' and article is not null and onlineprice<20000 and availability='inStock' and collectmethod='full' ) tk join (select calday, material, zprice from price.toprice where calday between '2016-01-01' and '2016-01-25' and zcityc ='7702' and distr_chain='I1' ) tp ON tk.calday=tp.calday and tk.article=tp.material GROUP BY tk.calday order by tk.calday;
The problem is that second mapper stops on the initialization stage when I select more than 20 days (calday between '2016-01-01' and '2016-01-25'). If I select less days it works correctly.
It can stay in pending status for hours. There are no errors in logs. After MAP 1 only one container is running (with AM).
-------------------------------------------------------------------------------- VERTICES STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED -------------------------------------------------------------------------------- Map 1 .......... SUCCEEDED 75 75 0 0 1 0 Map 5 INITIALIZING -1 0 0 -1 0 0 Reducer 2 INITED 366 0 0 366 0 0 Reducer 3 INITED 174 0 0 174 0 0 Reducer 4 INITED 1 0 0 1 0 0 -------------------------------------------------------------------------------- VERTICES: 01/05 [===>>-----------------------] 12% ELAPSED TIME: 8122.10 s --------------------------------------------------------------------------------
EXPLAIN PLAN
STAGE DEPENDENCIES: Stage-1 is a root stage Stage-0 depends on stages: Stage-1 STAGE PLANS: Stage: Stage-1 Tez Edges: Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 5 (SIMPLE_EDGE) Reducer 3 <- Reducer 2 (SIMPLE_EDGE) Reducer 4 <- Reducer 3 (SIMPLE_EDGE) DagName: hive_20160208101616_cf92edfe-7c45-4122-bb4f-5535b1ddf1ca:59 Vertices: Map 1 Map Operator Tree: TableScan alias: toprice filterExpr: (((zcityc = '7702') and (distr_chain = 'I1')) and material is not null) (type: boolean) Statistics: Num rows: 520800470 Data size: 558197823476 Basic stats: COMPLETE Column stats: PARTIAL Filter Operator predicate: (((zcityc = '7702') and (distr_chain = 'I1')) and material is not null) (type: boolean) Statistics: Num rows: 65100059 Data size: 11978410856 Basic stats: COMPLETE Column stats: PARTIAL Select Operator expressions: calday (type: string), material (type: string), zprice (type: double) outputColumnNames: _col0, _col1, _col2 Statistics: Num rows: 65100059 Data size: 11978410856 Basic stats: COMPLETE Column stats: PARTIAL Reduce Output Operator key expressions: _col0 (type: string), _col1 (type: string) sort order: ++ Map-reduce partition columns: _col0 (type: string), _col1 (type: string) Statistics: Num rows: 65100059 Data size: 11978410856 Basic stats: COMPLETE Column stats: PARTIAL value expressions: _col2 (type: double) Select Operator expressions: _col0 (type: string) outputColumnNames: _col0 Statistics: Num rows: 65100059 Data size: 11978410856 Basic stats: COMPLETE Column stats: PARTIAL Group By Operator keys: _col0 (type: string) mode: hash outputColumnNames: _col0 Statistics: Num rows: 8724 Data size: 1605216 Basic stats: COMPLETE Column stats: PARTIAL Dynamic Partitioning Event Operator Target Input: tkonkurent Partition key expr: calday Statistics: Num rows: 8724 Data size: 1605216 Basic stats: COMPLETE Column stats: PARTIAL Target column: calday Target Vertex: Map 5 Execution mode: vectorized Map 5 Map Operator Tree: TableScan alias: tkonkurent filterExpr: (((((((marketid = 'movideo') and (city = 'Moscow')) and (cataloglevel4 = 'fridge')) and article is not null) and (onlineprice < 20000.0)) and (availability = 'inStock')) and (collectmethod = 'full')) (type: boolean) Statistics: Num rows: 272469616 Data size: 1090024710512 Basic stats: COMPLETE Column stats: PARTIAL Filter Operator predicate: (((((((marketid = 'movideo') and (city = 'Moscow')) and (cataloglevel4 = 'fridge')) and article is not null) and (onlineprice < 20000.0)) and (availability = 'inStock')) and (collectmethod = 'full')) (type: boolean) Statistics: Num rows: 1419112 Data size: 261116608 Basic stats: COMPLETE Column stats: PARTIAL Select Operator expressions: calday (type: string), onlineprice (type: double), article (type: string) outputColumnNames: _col0, _col1, _col2 Statistics: Num rows: 1419112 Data size: 261116608 Basic stats: COMPLETE Column stats: PARTIAL Reduce Output Operator key expressions: _col0 (type: string), _col2 (type: string) sort order: ++ Map-reduce partition columns: _col0 (type: string), _col2 (type: string) Statistics: Num rows: 1419112 Data size: 261116608 Basic stats: COMPLETE Column stats: PARTIAL value expressions: _col1 (type: double) Execution mode: vectorized Reducer 2 Reduce Operator Tree: Merge Join Operator condition map: Inner Join 0 to 1 condition expressions: 0 {KEY.reducesinkkey0} {VALUE._col0} 1 {VALUE._col0} outputColumnNames: _col0, _col1, _col8 Statistics: Num rows: 11548034365951 Data size: 2124838323334984 Basic stats: COMPLETE Column stats: PARTIAL Select Operator expressions: _col0 (type: string), _col1 (type: double), _col8 (type: double) outputColumnNames: _col0, _col1, _col8 Statistics: Num rows: 11548034365951 Data size: 2124838323334984 Basic stats: COMPLETE Column stats: PARTIAL Group By Operator aggregations: avg(_col1), avg(_col8) keys: _col0 (type: string) mode: hash outputColumnNames: _col0, _col1, _col2 Statistics: Num rows: 31662559 Data size: 5825910856 Basic stats: COMPLETE Column stats: PARTIAL Reduce Output Operator key expressions: _col0 (type: string) sort order: + Map-reduce partition columns: _col0 (type: string) Statistics: Num rows: 31662559 Data size: 5825910856 Basic stats: COMPLETE Column stats: PARTIAL value expressions: _col1 (type: struct<count:bigint,sum:double,input:double>), _col2 (type: struct<count:bigint,sum:double,input:double>) Reducer 3 Reduce Operator Tree: Group By Operator aggregations: avg(VALUE._col0), avg(VALUE._col1) keys: KEY._col0 (type: string) mode: mergepartial outputColumnNames: _col0, _col1, _col2 Statistics: Num rows: 1 Data size: 200 Basic stats: COMPLETE Column stats: PARTIAL Select Operator expressions: _col0 (type: string), round(_col1, 2) (type: double), round(_col2, 2) (type: double) outputColumnNames: _col0, _col1, _col2 Statistics: Num rows: 1 Data size: 200 Basic stats: COMPLETE Column stats: PARTIAL Reduce Output Operator key expressions: _col0 (type: string) sort order: + Statistics: Num rows: 1 Data size: 200 Basic stats: COMPLETE Column stats: PARTIAL value expressions: _col1 (type: double), _col2 (type: double) Reducer 4 Reduce Operator Tree: Select Operator expressions: KEY.reducesinkkey0 (type: string), VALUE._col0 (type: double), VALUE._col1 (type: double) outputColumnNames: _col0, _col1, _col2 Statistics: Num rows: 1 Data size: 200 Basic stats: COMPLETE Column stats: PARTIAL File Output Operator compressed: false Statistics: Num rows: 1 Data size: 200 Basic stats: COMPLETE Column stats: PARTIAL table: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe Execution mode: vectorized Stage: Stage-0 Fetch Operator limit: -1 Processor Tree: ListSink
HIVE SETTING
hive.tez.container.size=5120 hive.tez.java.opts=-server -Xmx4096m -Djava.net.preferIPv4Stack=true -XX:NewRatio=8 -XX:+UseNUMA -XX:+UseParallelGC -XX:+PrintGCDetails -verbose:gc -XX:+PrintGCTimeStamps mapreduce.map.memory.mb=5120 mapreduce.map.java.opts=-Xmx4096m mapreduce.reduce.memory.mb=8192 mapreduce.reduce.java.opts=-Xmx6554m tez.runtime.io.sort.mb=2047 hive.auto.convert.sortmerge.join=true hive.auto.convert.sortmerge.join.to.mapjoin=false hive.convert.join.bucket.mapjoin.tez=false hive.enforce.sortmergebucketmapjoin=true hive.exec.submit.local.task.via.child=true hive.mapjoin.bucket.cache.size=10000 hive.mapjoin.optimized.hashtable=true hive.optimize.bucketmapjoin=true YARN yarn.nodemanager.resource.memory-mb=56500 yarn.scheduler.minimum-allocation-mb=5120 yarn.scheduler.maximum-allocation-mb=48000 yarn.nodemanager.resource.cpu-vcores=20
I also tried with these settings, but it only helps to avoid failed tasks in MAP 1. Main problem still remains.
SET hive.tez.container.size=20480; SET hive.tez.java.opts=-Xmx16384m;
Created 02-09-2016 10:48 AM
@Benjamin Leonhardi, @Sourygna Luangsay
I solved the problem! I suffered from this bug IndexOutOfBoundsException with RemoveDynamicPruningBySize
(forgot to mention I use Hive 0.14)
With this setting my join works even on the semi-annual period.
set hive.tez.dynamic.partition.pruning=false;
But what about other queries? As I know Dynamic Partition Pruning on TEZ is very good feature. I would not want to disable it globally...
Created 02-09-2016 01:57 PM
Unfortunately not in the near future.