08-24-2017 05:44 AM
I'm trying to get Hive-On-Spark configured for a CDH-5.8.3-1 install, and I'm hitting some snags as relates to memory.
The relevant (I hope) tuning parameters I am using are:
-- Default Settings for Cluster SET hive.hadoop.supports.splittable.combineinputformat=true; SET hive.exec.orc.default.stripe.size=268435456; SET hive.optimize.skew.join=true; -- Tuning parameters recommended internally (not sure if needed) SET hive.exec.reducers.max=1200; SET mapred.min.split.size=1000000000; SET mapred.max.split.size=1000000000; -- Tuning parameters used in most of our hive jobs to reduce spill (not sure if needed) SET io.sort.mb = 1024; SET io.sort.spill.percent = .90; SET hive.cbo.enable=true; SET hive.stats.fetch.column.stats=true; SET hive.stats.fetch.partition.stats=true; -- Enable Hive-On-Spark and set memory generously
-- Cluster limits yarn containers to 24G SET spark.yarn.executor.memoryOverhead=4000; SET spark.executor.memory=12G; SET spark.executor.cores=3; SET hive.execution.engine=spark; -- Parameters taken from https://www.cloudera.com/documentation/enterprise/5-8-x/topics/admin_hos_tuning.html SET hive.optimize.reducededuplication.min.reducer=4; SET hive.optimize.reducededuplication=true; SET hive.merge.mapfiles=true; SET hive.merge.mapredfiles=false; SET hive.merge.smallfiles.avgsize=16000000; SET hive.merge.size.per.task=256000000; SET hive.merge.sparkfiles=true; SET hive.auto.convert.join=true; SET hive.auto.convert.join.noconditionaltask=true; SET hive.auto.convert.join.noconditionaltask.size=20000000; SET hive.optimize.bucketmapjoin.sortedmerge=false; SET hive.map.aggr.hash.percentmemory=0.5; SET hive.map.aggr=true; SET hive.optimize.sort.dynamic.partition=false; SET hive.stats.autogather=true; SET hive.stats.fetch.column.stats=true; SET hive.compute.query.using.stats=true; SET hive.limit.pushdown.memory.usage=0.4; SET hive.optimize.index.filter=true; SET hive.exec.reducers.bytes.per.reducer=67108864; SET hive.smbjoin.cache.rows=10000; SET hive.fetch.task.conversion=more; SET hive.fetch.task.conversion.threshold=1073741824; SET hive.optimize.ppd=true;
And the query looks something like this:
CREATE TABLE fact_table ( serial_id STRING, --2 billion unique records common_id STRING, --10 million unique records [ 200 other columns of every imaginable type and size ] ) STORED AS ORC; --2 billion records, bucketed by common_id into 200 buckets CREATE TABLE common_table ( common_id STRING, -- fact_table, aggregated by common_id [ ~40 other columns ] ) STORED AS TEXT; --10 million unique records, unbucketed CREATE TABLE target_table ( serial_id STRING, common_id STRING, [ all columns of fact_table and common_table ] ) STORED AS ORC; -- 2 billion records, bucketed by common_id into 200 buckets -- Grossly oversimplified: INSERT OVERWRITE TABLE target_table SELECT f.*, c.* FROM fact_table f LEFT OUTER JOIN common_table c ON f.common_id = c.common_id
So the use case is:
We have some fact table f, and we created an aggregate c based on a common key. We now want to join against the original fact table to produce target table t. Very simple ETL.
The query plan looks something like this:
Stage 0: 600 tasks (Map and Union the two tables)
Stage 1: 200 tasks (Repartition & Sort, then map partitions)
Stage 2: 200 tasks (Repartition & Sort, then map partitions)
My understanding is that Stage 0 is a deserialization stage, and basically just reads the source tables and maps the RDDs around by the common_id key. Stage 1 is the primary join, and Stage 2 is the bucket reordering.
In Stage 1, I get a bunch of tasks failing with:
WARN YarnAllocator: Container killed by YARN for exceeding memory limits. 16.1 GB of 16 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
Hive-on-Mapreduce seems to handle this query just fine with significantly less memory, albeit very slowly. I'm wondering if there are some common parameters I'm just stupidly missing, or if maybe I'm running something a bit too big for Hive-on-Spark.
Thanks in advance! I'm happy to provide more information if I'm missing anything important!
08-28-2017 05:25 AM
The cluster is a hybrid cluster, so yarn.nodemanager.resource.* varies by host:
RAM .......... resource.memory-mb 64G .......... 34G 128G .......... 80G 256G .......... 176G V-Cores ........... resource.vcores 32 ........... 26 40 ........... 30
Resources are capped by:
08-28-2017 05:41 AM - edited 08-28-2017 05:44 AM
looks like a ovearhead memory issue based on your error and values , got to tweak the spark / yarn configuration based on your enviroment
this is one good blog that i have will explain the theory and calculation of the memory overhead for spark .
The solution that he provides might sound crazy but I actually tried in my test enviroment .
quoting from the blog .
" That is,to set the Yarn parameteryarn.nodemanager.resource.memory-mb to MORE THAN THE AVAILABLE PHYSICAL MEMORY (luckilyYarn does not check that). It also helps a little bit to setyarn.scheduler.minimum-allocation-mb to a small value like 100M, so an app doesnot get much more that what it asks for. "
Hope this helps