Reply
New Contributor
Posts: 5
Registered: ‎08-24-2017

Issues configuring and running Hive-On-Spark (CDH5.8.3)

Hey all,

 

I'm trying to get Hive-On-Spark configured for a CDH-5.8.3-1 install, and I'm hitting some snags as relates to memory. 

 

The relevant (I hope) tuning parameters I am using are:

 

-- Default Settings for Cluster
SET hive.hadoop.supports.splittable.combineinputformat=true;
SET hive.exec.orc.default.stripe.size=268435456;
SET hive.optimize.skew.join=true;

-- Tuning parameters recommended internally (not sure if needed)
SET hive.exec.reducers.max=1200;
SET mapred.min.split.size=1000000000;
SET mapred.max.split.size=1000000000;

-- Tuning parameters used in most of our hive jobs to reduce spill (not sure if needed)
SET io.sort.mb = 1024;
SET io.sort.spill.percent = .90;
SET hive.cbo.enable=true;
SET hive.stats.fetch.column.stats=true;
SET hive.stats.fetch.partition.stats=true;

-- Enable Hive-On-Spark and set memory generously 
-- Cluster limits yarn containers to 24G SET spark.yarn.executor.memoryOverhead=4000; SET spark.executor.memory=12G; SET spark.executor.cores=3; SET hive.execution.engine=spark; -- Parameters taken from https://www.cloudera.com/documentation/enterprise/5-8-x/topics/admin_hos_tuning.html SET hive.optimize.reducededuplication.min.reducer=4; SET hive.optimize.reducededuplication=true; SET hive.merge.mapfiles=true; SET hive.merge.mapredfiles=false; SET hive.merge.smallfiles.avgsize=16000000; SET hive.merge.size.per.task=256000000; SET hive.merge.sparkfiles=true; SET hive.auto.convert.join=true; SET hive.auto.convert.join.noconditionaltask=true; SET hive.auto.convert.join.noconditionaltask.size=20000000; SET hive.optimize.bucketmapjoin.sortedmerge=false; SET hive.map.aggr.hash.percentmemory=0.5; SET hive.map.aggr=true; SET hive.optimize.sort.dynamic.partition=false; SET hive.stats.autogather=true; SET hive.stats.fetch.column.stats=true; SET hive.compute.query.using.stats=true; SET hive.limit.pushdown.memory.usage=0.4; SET hive.optimize.index.filter=true; SET hive.exec.reducers.bytes.per.reducer=67108864; SET hive.smbjoin.cache.rows=10000; SET hive.fetch.task.conversion=more; SET hive.fetch.task.conversion.threshold=1073741824; SET hive.optimize.ppd=true;

And the query looks something like this:

CREATE TABLE fact_table (
    serial_id STRING, --2 billion unique records
    common_id STRING, --10 million unique records
    [ 200 other columns of every imaginable type and size ]
) STORED AS ORC; --2 billion records, bucketed by common_id into 200 buckets

CREATE TABLE common_table (
    common_id STRING, -- fact_table, aggregated by common_id
    [ ~40 other columns ]
) STORED AS TEXT; --10 million unique records, unbucketed

CREATE TABLE target_table (
    serial_id STRING,
    common_id STRING,
    [ all columns of fact_table and common_table ]
) STORED AS ORC; -- 2 billion records, bucketed by common_id into 200 buckets 

-- Grossly oversimplified:
INSERT OVERWRITE TABLE target_table 
SELECT f.*, c.*
FROM
    fact_table f
LEFT OUTER JOIN
    common_table c
ON
    f.common_id = c.common_id

So the use case is:

 

We have some fact table f, and we created an aggregate c based on a common key. We now want to join against the original fact table to produce target table t. Very simple ETL.

 

The query plan looks something like this:

 

Stage 0: 600 tasks (Map and Union the two tables)

Stage 1: 200 tasks (Repartition & Sort, then map partitions)

Stage 2: 200 tasks (Repartition & Sort, then map partitions)

 

My understanding is that Stage 0 is a deserialization stage, and basically just reads the source tables and maps the RDDs around by the common_id key. Stage 1 is the primary join, and Stage 2 is the bucket reordering.

 

In Stage 1, I get a bunch of tasks failing with:

 

WARN YarnAllocator: Container killed by YARN for exceeding memory limits. 16.1 GB of 16 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

 

Hive-on-Mapreduce seems to handle this query just fine with significantly less memory, albeit very slowly. I'm wondering if there are some common parameters I'm just stupidly missing, or if maybe I'm running something a bit too big for Hive-on-Spark.

 

Thanks in advance! I'm happy to provide more information if I'm missing anything important!

 

Champion
Posts: 564
Registered: ‎05-16-2016

Re: Issues configuring and running Hive-On-Spark (CDH5.8.3)

Could you let me know the yarn.nodemanager.resource.memory-mb value 

New Contributor
Posts: 5
Registered: ‎08-24-2017

Re: Issues configuring and running Hive-On-Spark (CDH5.8.3)

The cluster is a hybrid cluster, so yarn.nodemanager.resource.* varies by host:

 

RAM  .......... resource.memory-mb
64G  ..........  34G
128G ..........  80G
256G .......... 176G

V-Cores ........... resource.vcores
32      ........... 26
40      ........... 30

Resources are capped by:

yarn.scheduler.maximum-allocation-mb = 24G
yarn.scheduler.maximum-allocation-vcores = 8
Highlighted
Champion
Posts: 564
Registered: ‎05-16-2016

Re: Issues configuring and running Hive-On-Spark (CDH5.8.3)

[ Edited ]

looks like a ovearhead memory issue based on your error and values ,  got to  tweak the spark / yarn configuration based on your enviroment 

 

this is one good blog that  i have will explain the theory and calculation of  the memory overhead for spark .

The solution that he provides might sound crazy but I actually tried in my test enviroment . 

 

quoting from the blog . 

 

" That is,to set the Yarn parameteryarn.nodemanager.resource.memory-mb to MORE THAN THE AVAILABLE PHYSICAL MEMORY (luckilyYarn does not check that). It also helps a little bit to setyarn.scheduler.minimum-allocation-mb to a small value like 100M, so an app doesnot get much more that what it asks for. " 

 

Hope this helps 

 

http://m.blog.csdn.net/oufuji/article/details/50387104

 

 

Announcements