Support Questions
Find answers, ask questions, and share your expertise

groupByKey() is failing on large datasets

groupByKey() is failing on large datasets

Hello All,

I am executing a job on 2.2 billion rows table which contains groupByKey() on RDD.

spark-submit --packages com.databricks:spark-csv_2.11:1.4.0 --jars /appdata/hwx/hdp/2.4.2.0-258/hadoop/lib/aws-java-sdk-1.7.4.jar,/appdata/hwx/hdp/2.4.2.0-258/hadoop/hadoop-aws-2.7.1.2.4.2.0-258.jar,/appdata/hwx/hdp/2.4.2.0-258/hadoop/hadoop-aws.jar --driver-memory 20g --master yarn --deploy-mode client --executor-memory 16g --conf spark.driver.maxResultSize=10g --num-executors 18 /appdata/hwx/scripts/python_jobs/Runs/isaxtest_Spark_code.py

Below is the error:

[Stage 2:> (0 + 160) / 200]16/07/31 13:08:19 ERROR YarnScheduler: Lost executor 15 on ip-10-228-211-233: Container killed by YARN for exceeding memory limits. 18.3 GB of 18 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead. [Stage 2:> (0 + 150) / 200]16/07/31 13:08:21 ERROR YarnScheduler: Lost executor 9 on ip-10-228-211-244: Container k illed by YARN for exceeding memory limits. 18.4 GB of 18 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead. 16/07/31 13:08:21 ERROR TransportRequestHandler: Error sending result RpcResponse{requestId=8646295513446257068, body=NioManagedBuffer{buf=java.nio.HeapByteBuffer[pos=0 lim=81 cap=81]}} to ip-10-228-211-76/10.228.211.76:37971; closing connection java.nio.channels.ClosedChannelException [Stage 2:> (0 + 140) / 200]16/07/31 13:08:24 ERROR YarnScheduler: Lost executor 2 on ip-10-228-211-76: Container ki lled by YARN for exceeding memory limits. 18.3 GB of 18 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead. [Stage 2:> (0 + 130) / 200]16/07/31 13:08:27 ERROR YarnScheduler: Lost executor 3 on ip-10-228-211-140: Container k illed by YARN for exceeding memory limits. 18.4 GB of 18 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

8 REPLIES 8

Re: groupByKey() is failing on large datasets

Super Guru
@shashi cheppela

You are running out of memory. how many columns do you have in your select? if you reduce the number of columns on group by, the amount of memory required will be reduced. Otherwise, you need to give more memory.

Re: groupByKey() is failing on large datasets

Expert Contributor

YARN is preempting the container. What version of Spark are you using? Also, try using fewer, larger executors. I've found that 40-45gb executors with 4 cores are most effective. Don't try to go larger on memory for the sake of GC time, and don't try to use more cores - I've found any larger than 4 cores and you start lose return on the cores.

Re: groupByKey() is failing on large datasets

Hi mqureshi,

Thank for the update. I am selecting 4 columns on groupByKey().

Hi Joe,

Spark version is 1.6.1. i tried with multiple runs of the job with different memory parameters and i am not mentioning any cores while executing the job, it is taking default cores from the system . I am trying with diff no of executors. Can you please provide the recommended memory configuration parameters to define.

Below is the server configuration:

Node Manager-9

Memory on each node- 60G

Total Cores- 16

Regards,

Shashi

Re: groupByKey() is failing on large datasets

New Contributor

Hi @shashi cheppela,

were you able to resolve this issue, where groupby is failing with larger dataset?

we have input data with one pair of key/value of size 5GB , we were getting into same issues as you faced. We tried with executor.instances 40 and executor.memory 70gb(35gb+35gb overhead) but it didn't help.

Could you please let us know how you overcome this issue

Regards

Srinivas

Re: groupByKey() is failing on large datasets

Hi @srinivas Nalanagula,

Can you please share your cluster details. It looks like you are allocating huge memory to the job.

Regards,

Shashi

Re: groupByKey() is failing on large datasets

New Contributor

Hi @shashi cheppela

my cluster details are: 153 nodes, 72.96 TB memory and 5814 cores.

there is no limitation with memory or cores that I can use for my job.

Re: groupByKey() is failing on large datasets

New Contributor

Instead of using groupbykey ,use reducebyKey.the reduceByKey example works much better on a large dataset. That's because Spark knows it can combine output with a common key on each partition before shuffling the data.

val wordCountsWithReduce = wordPairsRDD
  .reduceByKey(_ + _)
  .collect()

val wordCountsWithGroup = wordPairsRDD
  .groupByKey()
  .map(t => (t._1, t._2.sum))
  .collect()

Re: groupByKey() is failing on large datasets

New Contributor

Hi @Amaresh Dhal,

I tried with reduceByKey as well, it resulted in same issue.