Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

HIVE LLAP MapJoinMemoryExhaustionError

avatar
Explorer

query:

SELECT 1 FROM instalment it JOIN payinstalment pt ON it.id=pt.id_instalment JOIN df_contract_update_record dr ON it.id_credit=dr.id_credit AND dr.credit_type IN ("SS","SC","SQ","SF","SD") WHERE it.status="a" AND pt.status="a" AND dr.status IN ("a","k","p") AND it.TYPE_INSTALMENT NOT IN (8,60,100) AND pt.DATE_PAY<"2017-12-02" ;

log:

Vertex failed, vertexName=Map 1, vertexId=vertex_1512129221053_0889_46_02, diagnostics=» Task failed, taskId=task_1512129221053_0889_46_02_000034, diagnostics=» TaskAttempt 0 failed, info=

» Error: Error while running task ( failure ) : java.lang.RuntimeException: Map operator initialization failed   at org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor.init(MapRecordProcessor.java:354)   at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:188)   at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(TezProcessor.java:172)   at org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:370)   at org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:73)   at org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:61)   at java.security.AccessController.doPrivileged(Native Method)   at javax.security.auth.Subject.doAs(Subject.java:422)   at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866)   at org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:61)   at org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:37)   at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)   at org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool$WrappedCallable.call(StatsRecordingThreadPool.java:110)   at java.util.concurrent.FutureTask.run(FutureTask.java:266)   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)   at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Async Initialization failed. abortRequested=false   at org.apache.hadoop.hive.ql.exec.Operator.completeInitialization(Operator.java:464)   at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:398)   at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:564)   at org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:516)   at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:384)   at org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor.init(MapRecordProcessor.java:335)   ... 16 more Caused by: org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionError: Hash table loading exceeded memory limits for input: Map 2 numEntries: 101400000 estimatedMemoryUsage: 4294983912 effectiveThreshold: 2380123306 memoryMonitorInfo: { isLlap: true executorsPerNode: 8 maxExecutorsOverSubscribeMemory: 3 memoryOverSubscriptionFactor: 0.20000000298023224 memoryCheckInterval: 100000 noConditionalTaskSize: 64000000 adjustedNoConditionalTaskSize: 102400000 hashTableInflationFactor: 2.0 threshold: 204800000 }

I already did analyze table for all of them.

when hive.auto.convert.join.noconditionaltask.size=64000000, execution plan will be

Plan optimized by CBO.

Vertex dependency in root stage
Map 1 <- Map 2 (BROADCAST_EDGE), Map 3 (BROADCAST_EDGE)

but TABLE INSTALMENT

'rawDataSize'='386165513746',

'totalSize'='12865350898',


TABLE PAYINSTALMENT

'rawDataSize'='105397600207',

'totalSize'='4386069409',


both of them way more than noConditionalTaskSize,

why still convert common join to mapjoin?

it should go common join.

when hive.auto.convert.join.noconditionaltask.size=-1, execution plan will be

Plan optimized by CBO.

Vertex dependency in root stage
Map 1 <- Reducer 5 (BROADCAST_EDGE), Reducer 7 (BROADCAST_EDGE)
Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE)
Reducer 3 <- Map 6 (SIMPLE_EDGE), Reducer 2 (SIMPLE_EDGE)
Reducer 5 <- Map 4 (CUSTOM_SIMPLE_EDGE)
Reducer 7 <- Map 6 (CUSTOM_SIMPLE_EDGE)

it seems like LLAP cannot correctly detect table size. no matter how big the table is, common join will be converted to mapjoin. Then MapJoinMemoryExhaustionError will be thrown.

1 ACCEPTED SOLUTION

avatar
Cloudera Employee

Map join decisions are not based on table stats but rather the operator stats just above the join operator. If you have filters, projections and aggregations in your query the stats will get dramatically reduced by the it reaches join operator. You probably want to look at the data size of operator just above the map join operator. Also make sure all operators show "Basic stats : COMPLETE Column stats : COMPLETE" in the explain plan. One other thing is that LLAP decides map join differently than MR/Tez. For more information about how LLAP decides map joins please refer to https://community.hortonworks.com/articles/149998/map-join-memory-sizing-for-llap.html

View solution in original post

5 REPLIES 5

avatar
Cloudera Employee

Map join decisions are not based on table stats but rather the operator stats just above the join operator. If you have filters, projections and aggregations in your query the stats will get dramatically reduced by the it reaches join operator. You probably want to look at the data size of operator just above the map join operator. Also make sure all operators show "Basic stats : COMPLETE Column stats : COMPLETE" in the explain plan. One other thing is that LLAP decides map join differently than MR/Tez. For more information about how LLAP decides map joins please refer to https://community.hortonworks.com/articles/149998/map-join-memory-sizing-for-llap.html

avatar
Explorer

thanks, while I did both table and columns levels analyze, llap can figure it out correctly. But I got another question, if table stats were missing, why llap did mapjoin instead of common join. I upgraded from HDP2.6.1 to HDP2.6.3, some queries worked fine in HDP2.6.1, now they all threw MapJoinMemoryExhaustionError in HDP2.6.3. I can tell they were executed as common join in HDP2.6.1 while table stats were missing. But they were executed as mapjoin which caused MapJoinMemoryExhaustionError in HDP2.6.3. Are there any properties I should configure?

avatar
Cloudera Employee

If table stats are missings, hive optimizer guesses the stats based on file size which can be way off as on disk file sizes are encoded and compressed. When available optimizer will use rawDataSize instead of totalFileSize which is supported only for some formats. rawDataSize will be close to in-memory data size when the table gets loaded for processing. Even with rawDataSize, optimizer stats estimation can be way off when compared to actual values as some operators like filters and group by will require column statistics to make better decisions (specifically distinct column value count NDV). If you have column statistics still there can be cases where join decisions can go wrong as NDV values are not merged correctly across partitions in some older releases. To fix the NDV merge, make sure to enable bitvector merging via hive.stats.ndv.algo="hll" and hive.stats.fetch.bitvector=true. With hyperloglog, NDV values for columns are always closer to actual value (<4% error) even after merging across partitions. With all these stats, optimizer can better predict how many actual distinct values are going to be loaded by a hash table during join so as to avoid MapJoinMemoryExhaustionError.

I am not exactly sure how the explain plan looks like between HDP 2.6.1 and HDP 2.6.3, without looking at explain plan it is hard to guess. But the overall idea is that the more statistics the better will be optimizers decision.

totalFileSize < rawDataSize < column stats < column stats with bitvector merging

Make sure you have column stats with bitvector merging enabled for optimal query planning.

The reason for MapJoinMemoryExhaustionError is that, optimizer says for example that a map join will load 10M unique keys into hash table whereas in reality there are 100M keys which will exceed fair usage of memory by a single executor.

Another thing to look into is, make sure vector mapjoin operator is being used as memory estimation for vectorized map join operator will be more accurate than non-vectorized version (java objects memory usage estimation is hard).

avatar
Explorer

still confused me. same configuration in HDP 2.6.1 and HDP 2.6.3. Query will take more time than old version.

I have no idea how to fix it. I more like to downgrade to HDP 2.6.1....

BTW do you know when hive 2.2 will be released?

avatar
Cloudera Employee

Can you try re-running query with hive.stats.fetch.bitvector=true?