Support Questions

Find answers, ask questions, and share your expertise

Hive mapper not initializing

avatar
Expert Contributor

I try to join two quite big tables. Both tables are bucketed ORC (but on different columns).

SELECT tk.calday, round(avg(tk.onlineprice),2) mv_price, round(avg(tp.zprice),2) el_price
  FROM 
  (select calday, onlinePrice, article, marketid, catalogLevel4, city from price.tkonkurent
  where calday between '2016-01-01' and '2016-01-25'
  and marketid = 'movideo'
  and city='Moscow'
  and cataloglevel4='fridge'
  and article is not null
  and onlineprice<20000
  and availability='inStock'
  and collectmethod='full'
  ) tk
  join 
  (select calday, material, zprice from price.toprice
  where calday between '2016-01-01' and '2016-01-25'
  and zcityc ='7702'
  and distr_chain='I1' ) tp
  ON tk.calday=tp.calday
and tk.article=tp.material 
GROUP BY tk.calday
order by tk.calday;

The problem is that second mapper stops on the initialization stage when I select more than 20 days (calday between '2016-01-01' and '2016-01-25'). If I select less days it works correctly.

It can stay in pending status for hours. There are no errors in logs. After MAP 1 only one container is running (with AM).

  --------------------------------------------------------------------------------
        VERTICES      STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
--------------------------------------------------------------------------------
Map 1 ..........   SUCCEEDED     75         75        0        0       1       0
Map 5           INITIALIZING     -1          0        0       -1       0       0
Reducer 2             INITED    366          0        0      366       0       0
Reducer 3             INITED    174          0        0      174       0       0
Reducer 4             INITED      1          0        0        1       0       0
--------------------------------------------------------------------------------
VERTICES: 01/05  [===>>-----------------------] 12%   ELAPSED TIME: 8122.10 s
--------------------------------------------------------------------------------

EXPLAIN PLAN

STAGE DEPENDENCIES:
  Stage-1 is a root stage
  Stage-0 depends on stages: Stage-1

STAGE PLANS:
  Stage: Stage-1
    Tez
      Edges:
        Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 5 (SIMPLE_EDGE)
        Reducer 3 <- Reducer 2 (SIMPLE_EDGE)
        Reducer 4 <- Reducer 3 (SIMPLE_EDGE)
      DagName: hive_20160208101616_cf92edfe-7c45-4122-bb4f-5535b1ddf1ca:59
      Vertices:
        Map 1
            Map Operator Tree:
                TableScan
                  alias: toprice
                  filterExpr: (((zcityc = '7702') and (distr_chain = 'I1')) and material is not null) (type: boolean)
                  Statistics: Num rows: 520800470 Data size: 558197823476 Basic stats: COMPLETE Column stats: PARTIAL
                  Filter Operator
                    predicate: (((zcityc = '7702') and (distr_chain = 'I1')) and material is not null) (type: boolean)
                    Statistics: Num rows: 65100059 Data size: 11978410856 Basic stats: COMPLETE Column stats: PARTIAL
                    Select Operator
                      expressions: calday (type: string), material (type: string), zprice (type: double)
                      outputColumnNames: _col0, _col1, _col2
                      Statistics: Num rows: 65100059 Data size: 11978410856 Basic stats: COMPLETE Column stats: PARTIAL
                      Reduce Output Operator
                        key expressions: _col0 (type: string), _col1 (type: string)
                        sort order: ++
                        Map-reduce partition columns: _col0 (type: string), _col1 (type: string)
                        Statistics: Num rows: 65100059 Data size: 11978410856 Basic stats: COMPLETE Column stats: PARTIAL
                        value expressions: _col2 (type: double)
                      Select Operator
                        expressions: _col0 (type: string)
                        outputColumnNames: _col0
                        Statistics: Num rows: 65100059 Data size: 11978410856 Basic stats: COMPLETE Column stats: PARTIAL
                        Group By Operator
                          keys: _col0 (type: string)
                          mode: hash
                          outputColumnNames: _col0
                          Statistics: Num rows: 8724 Data size: 1605216 Basic stats: COMPLETE Column stats: PARTIAL
                          Dynamic Partitioning Event Operator
                            Target Input: tkonkurent
                            Partition key expr: calday
                            Statistics: Num rows: 8724 Data size: 1605216 Basic stats: COMPLETE Column stats: PARTIAL
                            Target column: calday
                            Target Vertex: Map 5
            Execution mode: vectorized
        Map 5
            Map Operator Tree:
                TableScan
                  alias: tkonkurent
                  filterExpr: (((((((marketid = 'movideo') and (city = 'Moscow')) and (cataloglevel4 = 'fridge')) and article is not null) and (onlineprice < 20000.0)) and (availability = 'inStock')) and (collectmethod = 'full')) (type: boolean)
                  Statistics: Num rows: 272469616 Data size: 1090024710512 Basic stats: COMPLETE Column stats: PARTIAL
                  Filter Operator
                    predicate: (((((((marketid = 'movideo') and (city = 'Moscow')) and (cataloglevel4 = 'fridge')) and article is not null) and (onlineprice < 20000.0)) and (availability = 'inStock')) and (collectmethod = 'full')) (type: boolean)                    Statistics: Num rows: 1419112 Data size: 261116608 Basic stats: COMPLETE Column stats: PARTIAL
                    Select Operator
                      expressions: calday (type: string), onlineprice (type: double), article (type: string)
                      outputColumnNames: _col0, _col1, _col2
                      Statistics: Num rows: 1419112 Data size: 261116608 Basic stats: COMPLETE Column stats: PARTIAL
                      Reduce Output Operator
                        key expressions: _col0 (type: string), _col2 (type: string)
                        sort order: ++
                        Map-reduce partition columns: _col0 (type: string), _col2 (type: string)
                        Statistics: Num rows: 1419112 Data size: 261116608 Basic stats: COMPLETE Column stats: PARTIAL
                        value expressions: _col1 (type: double)
            Execution mode: vectorized
        Reducer 2
            Reduce Operator Tree:
              Merge Join Operator
                condition map:
                     Inner Join 0 to 1
                condition expressions:
                  0 {KEY.reducesinkkey0} {VALUE._col0}
                  1 {VALUE._col0}
                outputColumnNames: _col0, _col1, _col8
                Statistics: Num rows: 11548034365951 Data size: 2124838323334984 Basic stats: COMPLETE Column stats: PARTIAL
                Select Operator
                  expressions: _col0 (type: string), _col1 (type: double), _col8 (type: double)
                  outputColumnNames: _col0, _col1, _col8
                  Statistics: Num rows: 11548034365951 Data size: 2124838323334984 Basic stats: COMPLETE Column stats: PARTIAL
                  Group By Operator
                    aggregations: avg(_col1), avg(_col8)
                    keys: _col0 (type: string)
                    mode: hash
                    outputColumnNames: _col0, _col1, _col2
                    Statistics: Num rows: 31662559 Data size: 5825910856 Basic stats: COMPLETE Column stats: PARTIAL
                    Reduce Output Operator
                      key expressions: _col0 (type: string)
                      sort order: +
                      Map-reduce partition columns: _col0 (type: string)
                      Statistics: Num rows: 31662559 Data size: 5825910856 Basic stats: COMPLETE Column stats: PARTIAL
                      value expressions: _col1 (type: struct<count:bigint,sum:double,input:double>), _col2 (type: struct<count:bigint,sum:double,input:double>)
        Reducer 3
            Reduce Operator Tree:
              Group By Operator
                aggregations: avg(VALUE._col0), avg(VALUE._col1)
                keys: KEY._col0 (type: string)
                mode: mergepartial
                outputColumnNames: _col0, _col1, _col2
                Statistics: Num rows: 1 Data size: 200 Basic stats: COMPLETE Column stats: PARTIAL
                Select Operator
                  expressions: _col0 (type: string), round(_col1, 2) (type: double), round(_col2, 2) (type: double)
                  outputColumnNames: _col0, _col1, _col2
                  Statistics: Num rows: 1 Data size: 200 Basic stats: COMPLETE Column stats: PARTIAL
                  Reduce Output Operator
                    key expressions: _col0 (type: string)
                    sort order: +
                    Statistics: Num rows: 1 Data size: 200 Basic stats: COMPLETE Column stats: PARTIAL
                    value expressions: _col1 (type: double), _col2 (type: double)
        Reducer 4
            Reduce Operator Tree:
              Select Operator
                expressions: KEY.reducesinkkey0 (type: string), VALUE._col0 (type: double), VALUE._col1 (type: double)
                outputColumnNames: _col0, _col1, _col2
                Statistics: Num rows: 1 Data size: 200 Basic stats: COMPLETE Column stats: PARTIAL
                File Output Operator
                  compressed: false
                  Statistics: Num rows: 1 Data size: 200 Basic stats: COMPLETE Column stats: PARTIAL
                  table:
                      input format: org.apache.hadoop.mapred.TextInputFormat
                      output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
            Execution mode: vectorized

  Stage: Stage-0
    Fetch Operator
      limit: -1
      Processor Tree:
        ListSink

HIVE SETTING

hive.tez.container.size=5120
hive.tez.java.opts=-server -Xmx4096m -Djava.net.preferIPv4Stack=true -XX:NewRatio=8 -XX:+UseNUMA -XX:+UseParallelGC -XX:+PrintGCDetails -verbose:gc -XX:+PrintGCTimeStamps
mapreduce.map.memory.mb=5120
mapreduce.map.java.opts=-Xmx4096m
mapreduce.reduce.memory.mb=8192
mapreduce.reduce.java.opts=-Xmx6554m
tez.runtime.io.sort.mb=2047
hive.auto.convert.sortmerge.join=true
hive.auto.convert.sortmerge.join.to.mapjoin=false
hive.convert.join.bucket.mapjoin.tez=false
hive.enforce.sortmergebucketmapjoin=true
hive.exec.submit.local.task.via.child=true
hive.mapjoin.bucket.cache.size=10000
hive.mapjoin.optimized.hashtable=true
hive.optimize.bucketmapjoin=true

YARN
yarn.nodemanager.resource.memory-mb=56500
yarn.scheduler.minimum-allocation-mb=5120
yarn.scheduler.maximum-allocation-mb=48000
yarn.nodemanager.resource.cpu-vcores=20

I also tried with these settings, but it only helps to avoid failed tasks in MAP 1. Main problem still remains.

SET hive.tez.container.size=20480; 
SET hive.tez.java.opts=-Xmx16384m;
1 ACCEPTED SOLUTION

avatar
Expert Contributor

@Benjamin Leonhardi, @Sourygna Luangsay

I solved the problem! I suffered from this bug IndexOutOfBoundsException with RemoveDynamicPruningBySize

(forgot to mention I use Hive 0.14)

With this setting my join works even on the semi-annual period.

set hive.tez.dynamic.partition.pruning=false;

But what about other queries? As I know Dynamic Partition Pruning on TEZ is very good feature. I would not want to disable it globally...

View solution in original post

10 REPLIES 10

avatar
Super Collaborator

What is the total quantity of memory dedicated to YARN in your cluster (check the Yarn WebUI to know it)?

Could you try reducing the number of reducers (you have 366 + 174 on the main reduce vertices) for instance by playing with the variable hive.exec.reducers.bytes.per.reducer? If you only have 75 mappers as input, I am not sure you need that many reducers.

avatar
Expert Contributor

@Sourygna Luangsay

I added yarn settings in question.

I changed hive.exec.reducers.bytes.per.reducer from 67108864 to 512000000

        VERTICES      STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
--------------------------------------------------------------------------------
Map 1           INITIALIZING     -1          0        0       -1       0       0
Map 4 ..........   SUCCEEDED     85         85        0        0       0       0
Reducer 2             INITED      4          0        0        4       0       0
Reducer 3             INITED      1          0        0        1       0       0
-------------------------------------------------------------------------------- 

VERTICES: 01/04 [========================>>--] 94% ELAPSED TIME: 439.70 s

avatar
Super Collaborator

If you are running independently the 2 subqueries that are in your join (query for price.tkonkurent and price.toprice. I think the one that is failing on your general query is tkonkurent, but let's get sure and execute the 2 queries), is it working?

avatar
Expert Contributor

Both subqueries separately work fine.

Actually join works for a period of less than 20 days. The problem occurs only when a period greater than 20 days.

avatar
Master Guru

How many files do you have in each partition folder? I had problems before when too many files in too many partitions exceeded the size limit for Tez Query definitions ( 64MB I think ) I am not sure if he failed immediately or If I had to look into the logs of the application to figure that out. But you said there were no errors so this is weird.

Are these ORC files? If there should be hundreds or thousands of files in each day and this is the problem you could try alter table concatenate to merge them into fewer files. ( or use something like pig for text files or change your loading )

avatar
Expert Contributor

@Benjamin Leonhardi

Both tables are bucketed ORC.

tkonkurent has 2 buckets (CLUSTERED BY (article)), toprice has 3 buckets (CLUSTERED BY (material)).

The contents of the article and material columns are the same. However, in tkonkurent column article is filled by only 20% (that's why only 2 buckets: one big with article = null, second small with article not null)

tkonkurent 2 files in partition

hadoop fs -du -h /apps/hive/warehouse/price.db/tkonkurent/calday=2016-01-2*
236.1 M  /apps/hive/warehouse/price.db/tkonkurent/calday=2016-01-20/000000_0
13.8 M   /apps/hive/warehouse/price.db/tkonkurent/calday=2016-01-20/000001_0
225.8 M  /apps/hive/warehouse/price.db/tkonkurent/calday=2016-01-21/000000_0
16.6 M   /apps/hive/warehouse/price.db/tkonkurent/calday=2016-01-21/000001_0

toprice 3 files in partition

hadoop fs -du -h /apps/hive/warehouse/price.db/toprice/calday=2016-01-2*
36.1 M  /apps/hive/warehouse/price.db/toprice/calday=2016-01-20/000000_0
35.6 M  /apps/hive/warehouse/price.db/toprice/calday=2016-01-20/000001_0
36.1 M  /apps/hive/warehouse/price.db/toprice/calday=2016-01-20/000002_0
36.6 M  /apps/hive/warehouse/price.db/toprice/calday=2016-01-21/000000_0
36.1 M  /apps/hive/warehouse/price.db/toprice/calday=2016-01-21/000001_0
36.5 M  /apps/hive/warehouse/price.db/toprice/calday=2016-01-21/000002_0

Here table definition

create table toprice
(
PLANT string,
MATERIAL string,
.... 35 columns....
ZCOPA3172 double
)
PARTITIONED BY (CALDAY string)
CLUSTERED BY (MATERIAL) INTO 3 BUCKETS
STORED AS ORC
tblproperties ("orc.compress"="ZLIB", "orc.stripe.size"="67108864","orc.row.index.stride"="10000");
CREATE TABLE tkonkurent
(
additionalaction string,
article string,
.... 46 columns ....
createdtime string
)
PARTITIONED BY (calday string)
CLUSTERED BY (article) SORTED BY (article ASC, marketid ASC) INTO 2 BUCKETS
STORED AS ORC
TBLPROPERTIES ('orc.compress'='ZLIB', 'orc.row.index.stride'='10000', 'orc.stripe.size'='67108864')

Benjamin, do you know what is happening on the mapper initialization stage?

avatar
Master Guru

That is definitely not too many files. During the init all objects are initialized, tasks are placed context objects created, ... I am wondering if during Task creation also dimension tables are loaded. ( in case of a mapside join ) but you don't have one your join is happening in the reducer.. So I am really at a loss what could go wrong here. I think you might have to open a support ticket for this one.

avatar
Expert Contributor

@Benjamin Leonhardi, @Sourygna Luangsay

I solved the problem! I suffered from this bug IndexOutOfBoundsException with RemoveDynamicPruningBySize

(forgot to mention I use Hive 0.14)

With this setting my join works even on the semi-annual period.

set hive.tez.dynamic.partition.pruning=false;

But what about other queries? As I know Dynamic Partition Pruning on TEZ is very good feature. I would not want to disable it globally...

avatar
Master Guru

Glad that you fixed it. Unfortunately I don't really understand how common this error is to happen. The patch code is pretty complex. You definitely do not want to disable the mapjoin conversion. I don't suppose an upgrade is in sight?