Created 07-29-2021 01:57 AM
Hello, I need your advice regarding what seems like a strange behavior in the cluster I'm using. Running Spark (2.4, Cloudera) on Yarn the configuration calls the set up of 10 executors:
spark = (SparkSession
.builder.master("yarn")
.config("spark.executor.cores", "12")
.config("spark.executor.memory", "8G")
.config("spark.num.executors", "10")
.config("spark.driver.memory", "6G")
.config("spark.yarn.am.memoryOverhead", "6G")
.config("spark.executor.memoryOverhead", "5G")
.config("spark.driver.memoryOverhead", "5G")
.config("spark.sql.hive.convertMetastoreOrc", "true")
.config("spark.executor.heartbeatInterval", "60s")
.config("spark.network.timeout", "600s")
.config("spark.driver.maxResultSize", "2g")
.config("spark.driver.cores","4")
.config("spark.executor.extraClassPath", "-Dhdp.version=current")
.config("spark.debug.maxToStringFields", 200)
.config("spark.sql.catalogImplementation", "hive")
.config("spark.memory.fraction", "0.8")
.config("spark.memory.storageFraction", "0.2")
.config("spark.sql.hive.filesourcePartitionFileCacheSize", "0")
.config("spark.yarn.maxAppAttempts", "10")
.appName(app_name)
.enableHiveSupport().getOrCreate())
However, the UI/ yarn logs show the start of only 2 executors, with a third starting at a later stage, to replace the second:
21/07/29 10:44:01 INFO Executor: Starting executor ID 3 on host <node name>
21/07/29 10:43:54 WARN YarnAllocator: Container killed by YARN for exceeding memory limits. 10.0 GB of 10 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
The memory issues wouldn't have emerge had the application set the number of executors requested.
Can you think of any reason why the cluster may "choose" to reduce the number of executors to set up? Any internal yarn/other configuration I should examine?
Created 08-11-2021 08:26 PM
Hi @RonyA
If you are a cloudera customer, please create the case we will work on this issue.
Created 08-12-2021 12:03 AM
My client is a Cloudera customer. I will let him know. Many thanks for your help
Created on 07-30-2021 08:57 AM - edited 07-30-2021 08:59 AM
Hi @RonyA
You haven't shared what is your dataset size. Apart from data you need to tune on few spark parameters.
spark = (SparkSession
.builder.master("yarn")
.config("spark.executor.cores", "5") # you have mentioned 12
.config("spark.num.executors", "10")
.config("spark.executor.memory", "10G")
.config("spark.executor.memoryOverhead", "2G") # executor memory * 0.1 or 0.2 %
.config("spark.driver.memory", "10G")
.config("spark.driver.memoryOverhead", "2G") # driver memory * 0.1 or 0.2 %
.config("spark.sql.hive.convertMetastoreOrc", "true")
.config("spark.executor.heartbeatInterval", "60s") # default 10s
.config("spark.network.timeout", "600s") # default 120s
.config("spark.driver.maxResultSize", "2g")
.config("spark.driver.cores","4")
.config("spark.executor.extraClassPath", "-Dhdp.version=current")
.config("spark.debug.maxToStringFields", 200)
.config("spark.sql.catalogImplementation", "hive")
.config("spark.memory.fraction", "0.8")
.config("spark.memory.storageFraction", "0.2")
.config("spark.sql.hive.filesourcePartitionFileCacheSize", "0")
.config("spark.yarn.maxAppAttempts", "10")
.appName(app_name)
.enableHiveSupport().getOrCreate())
Apart from above if you are doing any kind of wide operation shuffle is involved. To set shuffle value we will use below calculation:
spark.sql.shuffle.partitions = shuffle input size/ hdfs block size
for example, shuffle input size is 10GB and hdfs block size is 128 MB then shuffle partitions is 10GB/128MB = 80 partitions.
And also check you have enabled dynamic allocation or not. You can open Spark UI --> Select Application --> Go to the Environment page --> find spark.dynamicallocation.enabled property.
Created 08-01-2021 01:54 PM
Thanks for the detailed reply. The issue was encountered by other colleagues and I encountered it only lately. I will forward the reply to my colleagues and will test the configuration proposed once I get back to the office. As prior configurations worked well, following intensive tests on datasets of various kinds, I prefer not to apply dynamic allocation if that is not absolutely necessary. To my understanding, yarn should let the users define the number of executors and build the cluster accordingly. I'll return with more info once we tested the configuration you proposed.
Created 08-10-2021 03:47 AM
I've tried the configuration you provided with dynamic allocation enabled. The UI timeline still shows the start of only 2 excutors while the environment tab num.excutors=10 as set. The application is set up to run on datasets of diverse sizes but I was able to reduce the load for calculating the large ones by changes to the code. The calculations involve shuffle operaions (as reducedByKey) but since the dataset size in not fixed I don't see how I can use a fixed estimate shuffle input size in calculating spark.sql.shuffle.partitions. Thanks again for the tips but please let me know how can I coerece the set up of a specific number of executors in the cluster or which internal configuration I should look into to fix this issue.
Created 08-11-2021 08:26 PM
Hi @RonyA
If you are a cloudera customer, please create the case we will work on this issue.
Created 08-12-2021 12:03 AM
My client is a Cloudera customer. I will let him know. Many thanks for your help
Created on 08-12-2021 01:32 AM - edited 08-12-2021 01:35 AM
I see 2 things that would be good to understand:
1)why do the yarn containers exceed their size
2)why does he not provide the number of executors that you request?
1)
It seems like you are exceeding the yarn container size of 10GB. The executors will run in yarn containers. Maybe you need to increase the minimum yarn container size a bit? I think the message suggests the minimum container size for yarn is 10GB. If you request a 8GB executor, and there is some (2GB)overhead, he might hit the ceiling of what was assigned to him and this executor will exit.
2)
Looks like your cluster is not capable of providing the requested 10 executors of 8GB?
Other relevant info to share would be: how many nodes do you have, and for each node, how much memory is assigned to yarn , and how much is the yarn minimum container size?
Example:
suppose the yarn container size is 3 GB. Suppose you have 9 nodes. Suppose your executor memory is 1GB . Suppose 10 GB on each node is allocated to yarn. This means you have on each node enough memory to start 3 containers (3x3GB< 10GB). THerefore, when dynamic allocation is enabled, he will start 27 execuctors. Even if you would ask for more than this 27, he will only be capable of providing 27.
Maybe this helps?
Created 08-12-2021 04:03 AM
Thanks for the detailed follow up. Your suggestion that I'm not getting the executors requested as the "cluster is not capable of providing the requested 10 executors of 8GB" is an option my client should check. I will forward your suggestion to them so they'll be able to discuss further in the case they'll open with Cloudera (I'm not their data engineer).
Created 08-12-2021 04:20 AM
Just adding that I double checked with reduced excutors number and memory:
.config("spark.executor.memory", "3G")
.config("spark.num.executors", "5")
but the excutor still starts only 2 exceutors.
Created 08-13-2021 12:51 AM
Maybe you are still asking more than what is available?
It really depends on what kind of cluster you have available.
It depends on following paramaters:
1)cloudera manager-> yarn-> configuration ->yarn.nodemanager.resource.memory-mb (= Amount of physical memory, in MiB, that can be allocated for containers=all memory that yarn can use on 1 worker node)
2)yarn.scheduler.minimum-allocation-mb (container memmory minimum= every container will request this much memory)
3)yarn.nodemanager.resource.cpu-vcores (Container Virtual CPU Cores)
4)how many worker nodes? Cluster with x nodes?
I noticed you really are requesting a lot of cores too. Maybe you can try reduce these a bit? This might also be a bottleneck.