Created 02-08-2016 07:24 AM
I try to join two quite big tables. Both tables are bucketed ORC (but on different columns).
SELECT tk.calday, round(avg(tk.onlineprice),2) mv_price, round(avg(tp.zprice),2) el_price FROM (select calday, onlinePrice, article, marketid, catalogLevel4, city from price.tkonkurent where calday between '2016-01-01' and '2016-01-25' and marketid = 'movideo' and city='Moscow' and cataloglevel4='fridge' and article is not null and onlineprice<20000 and availability='inStock' and collectmethod='full' ) tk join (select calday, material, zprice from price.toprice where calday between '2016-01-01' and '2016-01-25' and zcityc ='7702' and distr_chain='I1' ) tp ON tk.calday=tp.calday and tk.article=tp.material GROUP BY tk.calday order by tk.calday;
The problem is that second mapper stops on the initialization stage when I select more than 20 days (calday between '2016-01-01' and '2016-01-25'). If I select less days it works correctly.
It can stay in pending status for hours. There are no errors in logs. After MAP 1 only one container is running (with AM).
-------------------------------------------------------------------------------- VERTICES STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED -------------------------------------------------------------------------------- Map 1 .......... SUCCEEDED 75 75 0 0 1 0 Map 5 INITIALIZING -1 0 0 -1 0 0 Reducer 2 INITED 366 0 0 366 0 0 Reducer 3 INITED 174 0 0 174 0 0 Reducer 4 INITED 1 0 0 1 0 0 -------------------------------------------------------------------------------- VERTICES: 01/05 [===>>-----------------------] 12% ELAPSED TIME: 8122.10 s --------------------------------------------------------------------------------
EXPLAIN PLAN
STAGE DEPENDENCIES: Stage-1 is a root stage Stage-0 depends on stages: Stage-1 STAGE PLANS: Stage: Stage-1 Tez Edges: Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 5 (SIMPLE_EDGE) Reducer 3 <- Reducer 2 (SIMPLE_EDGE) Reducer 4 <- Reducer 3 (SIMPLE_EDGE) DagName: hive_20160208101616_cf92edfe-7c45-4122-bb4f-5535b1ddf1ca:59 Vertices: Map 1 Map Operator Tree: TableScan alias: toprice filterExpr: (((zcityc = '7702') and (distr_chain = 'I1')) and material is not null) (type: boolean) Statistics: Num rows: 520800470 Data size: 558197823476 Basic stats: COMPLETE Column stats: PARTIAL Filter Operator predicate: (((zcityc = '7702') and (distr_chain = 'I1')) and material is not null) (type: boolean) Statistics: Num rows: 65100059 Data size: 11978410856 Basic stats: COMPLETE Column stats: PARTIAL Select Operator expressions: calday (type: string), material (type: string), zprice (type: double) outputColumnNames: _col0, _col1, _col2 Statistics: Num rows: 65100059 Data size: 11978410856 Basic stats: COMPLETE Column stats: PARTIAL Reduce Output Operator key expressions: _col0 (type: string), _col1 (type: string) sort order: ++ Map-reduce partition columns: _col0 (type: string), _col1 (type: string) Statistics: Num rows: 65100059 Data size: 11978410856 Basic stats: COMPLETE Column stats: PARTIAL value expressions: _col2 (type: double) Select Operator expressions: _col0 (type: string) outputColumnNames: _col0 Statistics: Num rows: 65100059 Data size: 11978410856 Basic stats: COMPLETE Column stats: PARTIAL Group By Operator keys: _col0 (type: string) mode: hash outputColumnNames: _col0 Statistics: Num rows: 8724 Data size: 1605216 Basic stats: COMPLETE Column stats: PARTIAL Dynamic Partitioning Event Operator Target Input: tkonkurent Partition key expr: calday Statistics: Num rows: 8724 Data size: 1605216 Basic stats: COMPLETE Column stats: PARTIAL Target column: calday Target Vertex: Map 5 Execution mode: vectorized Map 5 Map Operator Tree: TableScan alias: tkonkurent filterExpr: (((((((marketid = 'movideo') and (city = 'Moscow')) and (cataloglevel4 = 'fridge')) and article is not null) and (onlineprice < 20000.0)) and (availability = 'inStock')) and (collectmethod = 'full')) (type: boolean) Statistics: Num rows: 272469616 Data size: 1090024710512 Basic stats: COMPLETE Column stats: PARTIAL Filter Operator predicate: (((((((marketid = 'movideo') and (city = 'Moscow')) and (cataloglevel4 = 'fridge')) and article is not null) and (onlineprice < 20000.0)) and (availability = 'inStock')) and (collectmethod = 'full')) (type: boolean) Statistics: Num rows: 1419112 Data size: 261116608 Basic stats: COMPLETE Column stats: PARTIAL Select Operator expressions: calday (type: string), onlineprice (type: double), article (type: string) outputColumnNames: _col0, _col1, _col2 Statistics: Num rows: 1419112 Data size: 261116608 Basic stats: COMPLETE Column stats: PARTIAL Reduce Output Operator key expressions: _col0 (type: string), _col2 (type: string) sort order: ++ Map-reduce partition columns: _col0 (type: string), _col2 (type: string) Statistics: Num rows: 1419112 Data size: 261116608 Basic stats: COMPLETE Column stats: PARTIAL value expressions: _col1 (type: double) Execution mode: vectorized Reducer 2 Reduce Operator Tree: Merge Join Operator condition map: Inner Join 0 to 1 condition expressions: 0 {KEY.reducesinkkey0} {VALUE._col0} 1 {VALUE._col0} outputColumnNames: _col0, _col1, _col8 Statistics: Num rows: 11548034365951 Data size: 2124838323334984 Basic stats: COMPLETE Column stats: PARTIAL Select Operator expressions: _col0 (type: string), _col1 (type: double), _col8 (type: double) outputColumnNames: _col0, _col1, _col8 Statistics: Num rows: 11548034365951 Data size: 2124838323334984 Basic stats: COMPLETE Column stats: PARTIAL Group By Operator aggregations: avg(_col1), avg(_col8) keys: _col0 (type: string) mode: hash outputColumnNames: _col0, _col1, _col2 Statistics: Num rows: 31662559 Data size: 5825910856 Basic stats: COMPLETE Column stats: PARTIAL Reduce Output Operator key expressions: _col0 (type: string) sort order: + Map-reduce partition columns: _col0 (type: string) Statistics: Num rows: 31662559 Data size: 5825910856 Basic stats: COMPLETE Column stats: PARTIAL value expressions: _col1 (type: struct<count:bigint,sum:double,input:double>), _col2 (type: struct<count:bigint,sum:double,input:double>) Reducer 3 Reduce Operator Tree: Group By Operator aggregations: avg(VALUE._col0), avg(VALUE._col1) keys: KEY._col0 (type: string) mode: mergepartial outputColumnNames: _col0, _col1, _col2 Statistics: Num rows: 1 Data size: 200 Basic stats: COMPLETE Column stats: PARTIAL Select Operator expressions: _col0 (type: string), round(_col1, 2) (type: double), round(_col2, 2) (type: double) outputColumnNames: _col0, _col1, _col2 Statistics: Num rows: 1 Data size: 200 Basic stats: COMPLETE Column stats: PARTIAL Reduce Output Operator key expressions: _col0 (type: string) sort order: + Statistics: Num rows: 1 Data size: 200 Basic stats: COMPLETE Column stats: PARTIAL value expressions: _col1 (type: double), _col2 (type: double) Reducer 4 Reduce Operator Tree: Select Operator expressions: KEY.reducesinkkey0 (type: string), VALUE._col0 (type: double), VALUE._col1 (type: double) outputColumnNames: _col0, _col1, _col2 Statistics: Num rows: 1 Data size: 200 Basic stats: COMPLETE Column stats: PARTIAL File Output Operator compressed: false Statistics: Num rows: 1 Data size: 200 Basic stats: COMPLETE Column stats: PARTIAL table: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe Execution mode: vectorized Stage: Stage-0 Fetch Operator limit: -1 Processor Tree: ListSink
HIVE SETTING
hive.tez.container.size=5120 hive.tez.java.opts=-server -Xmx4096m -Djava.net.preferIPv4Stack=true -XX:NewRatio=8 -XX:+UseNUMA -XX:+UseParallelGC -XX:+PrintGCDetails -verbose:gc -XX:+PrintGCTimeStamps mapreduce.map.memory.mb=5120 mapreduce.map.java.opts=-Xmx4096m mapreduce.reduce.memory.mb=8192 mapreduce.reduce.java.opts=-Xmx6554m tez.runtime.io.sort.mb=2047 hive.auto.convert.sortmerge.join=true hive.auto.convert.sortmerge.join.to.mapjoin=false hive.convert.join.bucket.mapjoin.tez=false hive.enforce.sortmergebucketmapjoin=true hive.exec.submit.local.task.via.child=true hive.mapjoin.bucket.cache.size=10000 hive.mapjoin.optimized.hashtable=true hive.optimize.bucketmapjoin=true YARN yarn.nodemanager.resource.memory-mb=56500 yarn.scheduler.minimum-allocation-mb=5120 yarn.scheduler.maximum-allocation-mb=48000 yarn.nodemanager.resource.cpu-vcores=20
I also tried with these settings, but it only helps to avoid failed tasks in MAP 1. Main problem still remains.
SET hive.tez.container.size=20480; SET hive.tez.java.opts=-Xmx16384m;
Created 02-09-2016 10:48 AM
@Benjamin Leonhardi, @Sourygna Luangsay
I solved the problem! I suffered from this bug IndexOutOfBoundsException with RemoveDynamicPruningBySize
(forgot to mention I use Hive 0.14)
With this setting my join works even on the semi-annual period.
set hive.tez.dynamic.partition.pruning=false;
But what about other queries? As I know Dynamic Partition Pruning on TEZ is very good feature. I would not want to disable it globally...
Created 02-08-2016 08:29 AM
What is the total quantity of memory dedicated to YARN in your cluster (check the Yarn WebUI to know it)?
Could you try reducing the number of reducers (you have 366 + 174 on the main reduce vertices) for instance by playing with the variable hive.exec.reducers.bytes.per.reducer? If you only have 75 mappers as input, I am not sure you need that many reducers.
Created 02-08-2016 09:02 AM
I added yarn settings in question.
I changed hive.exec.reducers.bytes.per.reducer from 67108864 to 512000000
VERTICES STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED -------------------------------------------------------------------------------- Map 1 INITIALIZING -1 0 0 -1 0 0 Map 4 .......... SUCCEEDED 85 85 0 0 0 0 Reducer 2 INITED 4 0 0 4 0 0 Reducer 3 INITED 1 0 0 1 0 0 --------------------------------------------------------------------------------
VERTICES: 01/04 [========================>>--] 94% ELAPSED TIME: 439.70 s
Created 02-08-2016 12:19 PM
If you are running independently the 2 subqueries that are in your join (query for price.tkonkurent and price.toprice. I think the one that is failing on your general query is tkonkurent, but let's get sure and execute the 2 queries), is it working?
Created 02-08-2016 12:48 PM
Both subqueries separately work fine.
Actually join works for a period of less than 20 days. The problem occurs only when a period greater than 20 days.
Created 02-08-2016 02:01 PM
How many files do you have in each partition folder? I had problems before when too many files in too many partitions exceeded the size limit for Tez Query definitions ( 64MB I think ) I am not sure if he failed immediately or If I had to look into the logs of the application to figure that out. But you said there were no errors so this is weird.
Are these ORC files? If there should be hundreds or thousands of files in each day and this is the problem you could try alter table concatenate to merge them into fewer files. ( or use something like pig for text files or change your loading )
Created 02-09-2016 07:28 AM
Both tables are bucketed ORC.
tkonkurent has 2 buckets (CLUSTERED BY (article)), toprice has 3 buckets (CLUSTERED BY (material)).
The contents of the article and material columns are the same. However, in tkonkurent column article is filled by only 20% (that's why only 2 buckets: one big with article = null, second small with article not null)
tkonkurent 2 files in partition
hadoop fs -du -h /apps/hive/warehouse/price.db/tkonkurent/calday=2016-01-2* 236.1 M /apps/hive/warehouse/price.db/tkonkurent/calday=2016-01-20/000000_0 13.8 M /apps/hive/warehouse/price.db/tkonkurent/calday=2016-01-20/000001_0 225.8 M /apps/hive/warehouse/price.db/tkonkurent/calday=2016-01-21/000000_0 16.6 M /apps/hive/warehouse/price.db/tkonkurent/calday=2016-01-21/000001_0
toprice 3 files in partition
hadoop fs -du -h /apps/hive/warehouse/price.db/toprice/calday=2016-01-2* 36.1 M /apps/hive/warehouse/price.db/toprice/calday=2016-01-20/000000_0 35.6 M /apps/hive/warehouse/price.db/toprice/calday=2016-01-20/000001_0 36.1 M /apps/hive/warehouse/price.db/toprice/calday=2016-01-20/000002_0 36.6 M /apps/hive/warehouse/price.db/toprice/calday=2016-01-21/000000_0 36.1 M /apps/hive/warehouse/price.db/toprice/calday=2016-01-21/000001_0 36.5 M /apps/hive/warehouse/price.db/toprice/calday=2016-01-21/000002_0
Here table definition
create table toprice ( PLANT string, MATERIAL string, .... 35 columns.... ZCOPA3172 double ) PARTITIONED BY (CALDAY string) CLUSTERED BY (MATERIAL) INTO 3 BUCKETS STORED AS ORC tblproperties ("orc.compress"="ZLIB", "orc.stripe.size"="67108864","orc.row.index.stride"="10000");
CREATE TABLE tkonkurent ( additionalaction string, article string, .... 46 columns .... createdtime string ) PARTITIONED BY (calday string) CLUSTERED BY (article) SORTED BY (article ASC, marketid ASC) INTO 2 BUCKETS STORED AS ORC TBLPROPERTIES ('orc.compress'='ZLIB', 'orc.row.index.stride'='10000', 'orc.stripe.size'='67108864')
Benjamin, do you know what is happening on the mapper initialization stage?
Created 02-09-2016 10:21 AM
That is definitely not too many files. During the init all objects are initialized, tasks are placed context objects created, ... I am wondering if during Task creation also dimension tables are loaded. ( in case of a mapside join ) but you don't have one your join is happening in the reducer.. So I am really at a loss what could go wrong here. I think you might have to open a support ticket for this one.
Created 02-09-2016 10:48 AM
@Benjamin Leonhardi, @Sourygna Luangsay
I solved the problem! I suffered from this bug IndexOutOfBoundsException with RemoveDynamicPruningBySize
(forgot to mention I use Hive 0.14)
With this setting my join works even on the semi-annual period.
set hive.tez.dynamic.partition.pruning=false;
But what about other queries? As I know Dynamic Partition Pruning on TEZ is very good feature. I would not want to disable it globally...
Created 02-09-2016 01:07 PM
Glad that you fixed it. Unfortunately I don't really understand how common this error is to happen. The patch code is pretty complex. You definitely do not want to disable the mapjoin conversion. I don't suppose an upgrade is in sight?