Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Spark dynamic-allocation dont work

avatar
Contributor

I want to use the dynamic-allocation feature from spark for my submitted applications, but the applications do not scale.

My cluster consists of 3 nodes and each has:

  • 4 cores
  • 8GB RAM
  • spark: 1.6
  • YARN + MapReduce2: 2.7

I use hdp 2.4 and set up all needed dynamic-allocation properties as follows (they were preconfigured in hdp but I verify them with the docs😞

  • spark.dynamicAllocation.enabled=true
  • spark.dynamicAllocation.initialExecutors=5
  • spark.dynamicAllocation.maxExecutors=10
  • spark.dynamicAllocation.minExecutors=1
  • spark.shuffle.service.enabled=true
  • yarn.nodemanager.aux-services=mapreduce_shuffle,spark_shuffle
  • yarn.nodemanager.aux-services.spark_shuffle.class=org.apache.spark.network.yarn.YarnShuffleService

I use the yarn ResourceManager-UI to lookup the used cores.

When I submit the following PI-example only 3 container with each 1 core are used. There where enough resources to allocate, but no more resources were used.

./bin/spark-submit --class org.apache.spark.examples.SparkPi \
--master yarn-cluster \
--deploy-mode cluster \
--driver-memory 512m \
--executor-memory 512m \
lib/spark-examples*.jar 10000

When I test to submit the PI-example with a defined number of executors, it a lot more resources could be allocated. In this example it static allocate 8 container with each 1 core. (1 driver + 7 executor)

./bin/spark-submit --class org.apache.spark.examples.SparkPi \
--master yarn-cluster \
--num-executors 7 \
--driver-memory 512m \
--executor-memory 512m \
lib/spark-examples*.jar 100000

What did I wrong, that spark not automatically allocate the maximum of available resources?

Thank you for you help 🙂

1 ACCEPTED SOLUTION

avatar
Contributor

I think it was my problem, sorry.

I only checked the configs from the spark-thrift-sparkconf. After adding the config to spark-default it works.

Thank you for your help.

View solution in original post

9 REPLIES 9

avatar
Master Guru

@Andrew Watson have you seen this?

avatar
Contributor

Using the first command: only 3 cores are used

5932-3-cores.png

The second command: 8 cores are used

5933-8-cores.png

avatar
Contributor

I think SparkPi cannot effective validate the functionality of dynamic allocation, basically because it runs so fast that give little time for dynamic allocation to bring up more executors. For dynamic allocation, it needs to detect current loads (num of tasks) and calculate the expected number of executors, and then issue requests to AM/RM/NM through RPC, usually it took several seconds to handle this pipeline. But I guess the whole SparkPi application will only run several seconds, so it is too fast for dynamic allocation to fully request the expected resources. If you want to verify the functionality of dynamic allocation, Spark thriftserver is good candidate.

avatar
Contributor

With an input value of 100000 the SparkPi needed about 3 minutes. I checked the number of cores often, but nothing changed.

I never used the Spark thriftserver before. Can you tell me how to a demo job?

avatar
Contributor

1. If the initialExecutors = 5 as you set, the initial number of executors should be 5 as monitored.

2. Another concern is that if you want to bring up the executor number to maximum, you need to submit jobs continuously to increase the load of scheduler. In your case SparkPi application only has one job, so the load of scheduler will be decrease once after the job is submitted and Dynamic Allocation will schedule the new expected resource number.

3. If you're using latest version of HDP, thriftserver is already installed with dynamic allocation enabled be default. You could use beeline to submit SQL queries.

4. The logics of code is correct, I think what you need to do is to try different scenarios to trigger that logic.

avatar

@Rene Rene

Documentation about Dynamic Execution says the following (bold mine):

There are two requirements for using this feature. First, your application must set spark.dynamicAllocation.enabled to true. Second, you must set up an external shuffle service on each worker node in the same cluster and set spark.shuffle.service.enabled to true in your application. The purpose of the external shuffle service is to allow executors to be removed without deleting shuffle files written by them (more detail described below). The way to set up this service varies across cluster managers:

In standalone mode, simply start your workers with spark.shuffle.service.enabled set to true.

In Mesos coarse-grained mode, run $SPARK_HOME/sbin/start-mesos-shuffle-service.sh on all slave nodes withspark.shuffle.service.enabled set to true. For instance, you may do so through Marathon.

In YARN mode, start the shuffle service on each NodeManager as follows:

  1. Build Spark with the YARN profile. Skip this step if you are using a pre-packaged distribution.
  2. Locate the spark-<version>-yarn-shuffle.jar. This should be under $SPARK_HOME/network/yarn/target/scala-<version> if you are building Spark yourself, and under lib if you are using a distribution.
  3. Add this jar to the classpath of all NodeManagers in your cluster.
  4. In the yarn-site.xml on each node, add spark_shuffle to yarn.nodemanager.aux-services, then set yarn.nodemanager.aux-services.spark_shuffle.class to org.apache.spark.network.yarn.YarnShuffleService.
  5. Restart all NodeManagers in your cluster.

All other relevant configurations are optional and under the spark.dynamicAllocation.* and spark.shuffle.service.* namespaces. For more detail, see the configurations page.

Reference Link:

https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation

avatar
Super Collaborator

I hope it worked after starting the shuffle service, but fyi here are three related HDP 2.4.0 documentation pages for clusters and jobs.

http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.4.0/bk_spark-guide/content/config-dra-manual.htm...

http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.4.0/bk_spark-guide/content/ch_config-dea.html

http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.4.0/bk_spark-guide/content/config-dra-job.html

(If you're on 2.4.2, there's an equivalent set of pages under .../HDP-2.4.2/...)

avatar
Contributor

I think it was my problem, sorry.

I only checked the configs from the spark-thrift-sparkconf. After adding the config to spark-default it works.

Thank you for your help.

avatar
Expert Contributor

We were into the same scenario where Zeppelin was always launching the 3 Containers in YARN even after having the Dynamic allocation parameters enabled from Spark but Zeppelin is not able to pick these parameters,

To get the Zeppelin to launch more than 3 containers (the default it is launching) we need to configure in the Zeppelin Spark interpreter

spark.dynamicAllocation.enabled=true

spark.shuffle.service.enabled=true

spark.dynamicAllocation.initialExecutors=0

spark.dynamicAllocation.minExecutors=2 --> Start this value with the lower number, if not it will launch number of the minimum containers specified and will only use the required containers (memory and VCores) and rest of the memory and VCores will be marked as reserved memory and causes memory issues

spark.dynamicAllocation.maxExecutors=10

And it is always good to start with less executor memory (e.g 10/15g) and more executors (20/30)

Our scenario we have observed that giving the executor memory (50/100g) and executors as (5/10) the query took 3min 48secs (228sec) --> which is obvious as the parallelism is very less and reducing the executor memory (10/15g) and increasing the executors (25/30) the same query took on 54secs.

Please note the number of executors and executor memory are usecase dependent and we have done few trails before getting the optimal performance for our scenario.