Created 07-21-2016 04:35 AM
I want to use the dynamic-allocation feature from spark for my submitted applications, but the applications do not scale.
My cluster consists of 3 nodes and each has:
I use hdp 2.4 and set up all needed dynamic-allocation properties as follows (they were preconfigured in hdp but I verify them with the docs😞
I use the yarn ResourceManager-UI to lookup the used cores.
When I submit the following PI-example only 3 container with each 1 core are used. There where enough resources to allocate, but no more resources were used.
./bin/spark-submit --class org.apache.spark.examples.SparkPi \ --master yarn-cluster \ --deploy-mode cluster \ --driver-memory 512m \ --executor-memory 512m \ lib/spark-examples*.jar 10000
When I test to submit the PI-example with a defined number of executors, it a lot more resources could be allocated. In this example it static allocate 8 container with each 1 core. (1 driver + 7 executor)
./bin/spark-submit --class org.apache.spark.examples.SparkPi \ --master yarn-cluster \ --num-executors 7 \ --driver-memory 512m \ --executor-memory 512m \ lib/spark-examples*.jar 100000
What did I wrong, that spark not automatically allocate the maximum of available resources?
Thank you for you help 🙂
Created 07-25-2016 10:23 AM
I think it was my problem, sorry.
I only checked the configs from the spark-thrift-sparkconf. After adding the config to spark-default it works.
Thank you for your help.
Created 07-21-2016 04:44 AM
@Andrew Watson have you seen this?
Created on 07-21-2016 05:09 AM - edited 08-19-2019 01:21 AM
Using the first command: only 3 cores are used
The second command: 8 cores are used
Created 07-21-2016 07:08 AM
I think SparkPi cannot effective validate the functionality of dynamic allocation, basically because it runs so fast that give little time for dynamic allocation to bring up more executors. For dynamic allocation, it needs to detect current loads (num of tasks) and calculate the expected number of executors, and then issue requests to AM/RM/NM through RPC, usually it took several seconds to handle this pipeline. But I guess the whole SparkPi application will only run several seconds, so it is too fast for dynamic allocation to fully request the expected resources. If you want to verify the functionality of dynamic allocation, Spark thriftserver is good candidate.
Created 07-21-2016 07:34 AM
With an input value of 100000 the SparkPi needed about 3 minutes. I checked the number of cores often, but nothing changed.
I never used the Spark thriftserver before. Can you tell me how to a demo job?
Created 07-22-2016 12:58 AM
1. If the initialExecutors = 5 as you set, the initial number of executors should be 5 as monitored.
2. Another concern is that if you want to bring up the executor number to maximum, you need to submit jobs continuously to increase the load of scheduler. In your case SparkPi application only has one job, so the load of scheduler will be decrease once after the job is submitted and Dynamic Allocation will schedule the new expected resource number.
3. If you're using latest version of HDP, thriftserver is already installed with dynamic allocation enabled be default. You could use beeline to submit SQL queries.
4. The logics of code is correct, I think what you need to do is to try different scenarios to trigger that logic.
Created 07-21-2016 04:55 PM
Documentation about Dynamic Execution says the following (bold mine):
There are two requirements for using this feature. First, your application must set spark.dynamicAllocation.enabled
to true
. Second, you must set up an external shuffle service on each worker node in the same cluster and set spark.shuffle.service.enabled
to true in your application. The purpose of the external shuffle service is to allow executors to be removed without deleting shuffle files written by them (more detail described below). The way to set up this service varies across cluster managers:
In standalone mode, simply start your workers with spark.shuffle.service.enabled
set to true
.
In Mesos coarse-grained mode, run $SPARK_HOME/sbin/start-mesos-shuffle-service.sh
on all slave nodes withspark.shuffle.service.enabled
set to true
. For instance, you may do so through Marathon.
In YARN mode, start the shuffle service on each NodeManager
as follows:
spark-<version>-yarn-shuffle.jar
. This should be under $SPARK_HOME/network/yarn/target/scala-<version>
if you are building Spark yourself, and under lib
if you are using a distribution.NodeManager
s in your cluster.yarn-site.xml
on each node, add spark_shuffle
to yarn.nodemanager.aux-services
, then set yarn.nodemanager.aux-services.spark_shuffle.class
to org.apache.spark.network.yarn.YarnShuffleService
.NodeManager
s in your cluster.All other relevant configurations are optional and under the spark.dynamicAllocation.*
and spark.shuffle.service.*
namespaces. For more detail, see the configurations page.
Reference Link:
https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation
Created 07-22-2016 07:34 PM
I hope it worked after starting the shuffle service, but fyi here are three related HDP 2.4.0 documentation pages for clusters and jobs.
http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.4.0/bk_spark-guide/content/ch_config-dea.html
http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.4.0/bk_spark-guide/content/config-dra-job.html
(If you're on 2.4.2, there's an equivalent set of pages under .../HDP-2.4.2/...)
Created 07-25-2016 10:23 AM
I think it was my problem, sorry.
I only checked the configs from the spark-thrift-sparkconf. After adding the config to spark-default it works.
Thank you for your help.
Created 07-28-2017 08:19 AM
We were into the same scenario where Zeppelin was always launching the 3 Containers in YARN even after having the Dynamic allocation parameters enabled from Spark but Zeppelin is not able to pick these parameters,
To get the Zeppelin to launch more than 3 containers (the default it is launching) we need to configure in the Zeppelin Spark interpreter
spark.dynamicAllocation.enabled=true
spark.shuffle.service.enabled=true
spark.dynamicAllocation.initialExecutors=0
spark.dynamicAllocation.minExecutors=2 --> Start this value with the lower number, if not it will launch number of the minimum containers specified and will only use the required containers (memory and VCores) and rest of the memory and VCores will be marked as reserved memory and causes memory issues
spark.dynamicAllocation.maxExecutors=10
And it is always good to start with less executor memory (e.g 10/15g) and more executors (20/30)
Our scenario we have observed that giving the executor memory (50/100g) and executors as (5/10) the query took 3min 48secs (228sec) --> which is obvious as the parallelism is very less and reducing the executor memory (10/15g) and increasing the executors (25/30) the same query took on 54secs.
Please note the number of executors and executor memory are usecase dependent and we have done few trails before getting the optimal performance for our scenario.