Support Questions

Find answers, ask questions, and share your expertise

Hive mapper not initializing

avatar
Expert Contributor

I try to join two quite big tables. Both tables are bucketed ORC (but on different columns).

SELECT tk.calday, round(avg(tk.onlineprice),2) mv_price, round(avg(tp.zprice),2) el_price
  FROM 
  (select calday, onlinePrice, article, marketid, catalogLevel4, city from price.tkonkurent
  where calday between '2016-01-01' and '2016-01-25'
  and marketid = 'movideo'
  and city='Moscow'
  and cataloglevel4='fridge'
  and article is not null
  and onlineprice<20000
  and availability='inStock'
  and collectmethod='full'
  ) tk
  join 
  (select calday, material, zprice from price.toprice
  where calday between '2016-01-01' and '2016-01-25'
  and zcityc ='7702'
  and distr_chain='I1' ) tp
  ON tk.calday=tp.calday
and tk.article=tp.material 
GROUP BY tk.calday
order by tk.calday;

The problem is that second mapper stops on the initialization stage when I select more than 20 days (calday between '2016-01-01' and '2016-01-25'). If I select less days it works correctly.

It can stay in pending status for hours. There are no errors in logs. After MAP 1 only one container is running (with AM).

  --------------------------------------------------------------------------------
        VERTICES      STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
--------------------------------------------------------------------------------
Map 1 ..........   SUCCEEDED     75         75        0        0       1       0
Map 5           INITIALIZING     -1          0        0       -1       0       0
Reducer 2             INITED    366          0        0      366       0       0
Reducer 3             INITED    174          0        0      174       0       0
Reducer 4             INITED      1          0        0        1       0       0
--------------------------------------------------------------------------------
VERTICES: 01/05  [===>>-----------------------] 12%   ELAPSED TIME: 8122.10 s
--------------------------------------------------------------------------------

EXPLAIN PLAN

STAGE DEPENDENCIES:
  Stage-1 is a root stage
  Stage-0 depends on stages: Stage-1

STAGE PLANS:
  Stage: Stage-1
    Tez
      Edges:
        Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 5 (SIMPLE_EDGE)
        Reducer 3 <- Reducer 2 (SIMPLE_EDGE)
        Reducer 4 <- Reducer 3 (SIMPLE_EDGE)
      DagName: hive_20160208101616_cf92edfe-7c45-4122-bb4f-5535b1ddf1ca:59
      Vertices:
        Map 1
            Map Operator Tree:
                TableScan
                  alias: toprice
                  filterExpr: (((zcityc = '7702') and (distr_chain = 'I1')) and material is not null) (type: boolean)
                  Statistics: Num rows: 520800470 Data size: 558197823476 Basic stats: COMPLETE Column stats: PARTIAL
                  Filter Operator
                    predicate: (((zcityc = '7702') and (distr_chain = 'I1')) and material is not null) (type: boolean)
                    Statistics: Num rows: 65100059 Data size: 11978410856 Basic stats: COMPLETE Column stats: PARTIAL
                    Select Operator
                      expressions: calday (type: string), material (type: string), zprice (type: double)
                      outputColumnNames: _col0, _col1, _col2
                      Statistics: Num rows: 65100059 Data size: 11978410856 Basic stats: COMPLETE Column stats: PARTIAL
                      Reduce Output Operator
                        key expressions: _col0 (type: string), _col1 (type: string)
                        sort order: ++
                        Map-reduce partition columns: _col0 (type: string), _col1 (type: string)
                        Statistics: Num rows: 65100059 Data size: 11978410856 Basic stats: COMPLETE Column stats: PARTIAL
                        value expressions: _col2 (type: double)
                      Select Operator
                        expressions: _col0 (type: string)
                        outputColumnNames: _col0
                        Statistics: Num rows: 65100059 Data size: 11978410856 Basic stats: COMPLETE Column stats: PARTIAL
                        Group By Operator
                          keys: _col0 (type: string)
                          mode: hash
                          outputColumnNames: _col0
                          Statistics: Num rows: 8724 Data size: 1605216 Basic stats: COMPLETE Column stats: PARTIAL
                          Dynamic Partitioning Event Operator
                            Target Input: tkonkurent
                            Partition key expr: calday
                            Statistics: Num rows: 8724 Data size: 1605216 Basic stats: COMPLETE Column stats: PARTIAL
                            Target column: calday
                            Target Vertex: Map 5
            Execution mode: vectorized
        Map 5
            Map Operator Tree:
                TableScan
                  alias: tkonkurent
                  filterExpr: (((((((marketid = 'movideo') and (city = 'Moscow')) and (cataloglevel4 = 'fridge')) and article is not null) and (onlineprice < 20000.0)) and (availability = 'inStock')) and (collectmethod = 'full')) (type: boolean)
                  Statistics: Num rows: 272469616 Data size: 1090024710512 Basic stats: COMPLETE Column stats: PARTIAL
                  Filter Operator
                    predicate: (((((((marketid = 'movideo') and (city = 'Moscow')) and (cataloglevel4 = 'fridge')) and article is not null) and (onlineprice < 20000.0)) and (availability = 'inStock')) and (collectmethod = 'full')) (type: boolean)                    Statistics: Num rows: 1419112 Data size: 261116608 Basic stats: COMPLETE Column stats: PARTIAL
                    Select Operator
                      expressions: calday (type: string), onlineprice (type: double), article (type: string)
                      outputColumnNames: _col0, _col1, _col2
                      Statistics: Num rows: 1419112 Data size: 261116608 Basic stats: COMPLETE Column stats: PARTIAL
                      Reduce Output Operator
                        key expressions: _col0 (type: string), _col2 (type: string)
                        sort order: ++
                        Map-reduce partition columns: _col0 (type: string), _col2 (type: string)
                        Statistics: Num rows: 1419112 Data size: 261116608 Basic stats: COMPLETE Column stats: PARTIAL
                        value expressions: _col1 (type: double)
            Execution mode: vectorized
        Reducer 2
            Reduce Operator Tree:
              Merge Join Operator
                condition map:
                     Inner Join 0 to 1
                condition expressions:
                  0 {KEY.reducesinkkey0} {VALUE._col0}
                  1 {VALUE._col0}
                outputColumnNames: _col0, _col1, _col8
                Statistics: Num rows: 11548034365951 Data size: 2124838323334984 Basic stats: COMPLETE Column stats: PARTIAL
                Select Operator
                  expressions: _col0 (type: string), _col1 (type: double), _col8 (type: double)
                  outputColumnNames: _col0, _col1, _col8
                  Statistics: Num rows: 11548034365951 Data size: 2124838323334984 Basic stats: COMPLETE Column stats: PARTIAL
                  Group By Operator
                    aggregations: avg(_col1), avg(_col8)
                    keys: _col0 (type: string)
                    mode: hash
                    outputColumnNames: _col0, _col1, _col2
                    Statistics: Num rows: 31662559 Data size: 5825910856 Basic stats: COMPLETE Column stats: PARTIAL
                    Reduce Output Operator
                      key expressions: _col0 (type: string)
                      sort order: +
                      Map-reduce partition columns: _col0 (type: string)
                      Statistics: Num rows: 31662559 Data size: 5825910856 Basic stats: COMPLETE Column stats: PARTIAL
                      value expressions: _col1 (type: struct<count:bigint,sum:double,input:double>), _col2 (type: struct<count:bigint,sum:double,input:double>)
        Reducer 3
            Reduce Operator Tree:
              Group By Operator
                aggregations: avg(VALUE._col0), avg(VALUE._col1)
                keys: KEY._col0 (type: string)
                mode: mergepartial
                outputColumnNames: _col0, _col1, _col2
                Statistics: Num rows: 1 Data size: 200 Basic stats: COMPLETE Column stats: PARTIAL
                Select Operator
                  expressions: _col0 (type: string), round(_col1, 2) (type: double), round(_col2, 2) (type: double)
                  outputColumnNames: _col0, _col1, _col2
                  Statistics: Num rows: 1 Data size: 200 Basic stats: COMPLETE Column stats: PARTIAL
                  Reduce Output Operator
                    key expressions: _col0 (type: string)
                    sort order: +
                    Statistics: Num rows: 1 Data size: 200 Basic stats: COMPLETE Column stats: PARTIAL
                    value expressions: _col1 (type: double), _col2 (type: double)
        Reducer 4
            Reduce Operator Tree:
              Select Operator
                expressions: KEY.reducesinkkey0 (type: string), VALUE._col0 (type: double), VALUE._col1 (type: double)
                outputColumnNames: _col0, _col1, _col2
                Statistics: Num rows: 1 Data size: 200 Basic stats: COMPLETE Column stats: PARTIAL
                File Output Operator
                  compressed: false
                  Statistics: Num rows: 1 Data size: 200 Basic stats: COMPLETE Column stats: PARTIAL
                  table:
                      input format: org.apache.hadoop.mapred.TextInputFormat
                      output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
            Execution mode: vectorized

  Stage: Stage-0
    Fetch Operator
      limit: -1
      Processor Tree:
        ListSink

HIVE SETTING

hive.tez.container.size=5120
hive.tez.java.opts=-server -Xmx4096m -Djava.net.preferIPv4Stack=true -XX:NewRatio=8 -XX:+UseNUMA -XX:+UseParallelGC -XX:+PrintGCDetails -verbose:gc -XX:+PrintGCTimeStamps
mapreduce.map.memory.mb=5120
mapreduce.map.java.opts=-Xmx4096m
mapreduce.reduce.memory.mb=8192
mapreduce.reduce.java.opts=-Xmx6554m
tez.runtime.io.sort.mb=2047
hive.auto.convert.sortmerge.join=true
hive.auto.convert.sortmerge.join.to.mapjoin=false
hive.convert.join.bucket.mapjoin.tez=false
hive.enforce.sortmergebucketmapjoin=true
hive.exec.submit.local.task.via.child=true
hive.mapjoin.bucket.cache.size=10000
hive.mapjoin.optimized.hashtable=true
hive.optimize.bucketmapjoin=true

YARN
yarn.nodemanager.resource.memory-mb=56500
yarn.scheduler.minimum-allocation-mb=5120
yarn.scheduler.maximum-allocation-mb=48000
yarn.nodemanager.resource.cpu-vcores=20

I also tried with these settings, but it only helps to avoid failed tasks in MAP 1. Main problem still remains.

SET hive.tez.container.size=20480; 
SET hive.tez.java.opts=-Xmx16384m;
1 ACCEPTED SOLUTION

avatar
Expert Contributor

@Benjamin Leonhardi, @Sourygna Luangsay

I solved the problem! I suffered from this bug IndexOutOfBoundsException with RemoveDynamicPruningBySize

(forgot to mention I use Hive 0.14)

With this setting my join works even on the semi-annual period.

set hive.tez.dynamic.partition.pruning=false;

But what about other queries? As I know Dynamic Partition Pruning on TEZ is very good feature. I would not want to disable it globally...

View solution in original post

10 REPLIES 10

avatar
Expert Contributor

Unfortunately not in the near future.