Community Articles

Find and share helpful community-sourced technical articles.
Labels (1)
avatar
Master Collaborator

Dynamic Allocation in Apache Spark

1. Introduction

 

In Apache Spark, resource allocation is a critical aspect of optimizing the performance of Spark applications.

 

Spark provides two mechanisms to allocate resources: 

  1. Static Resource Allocation (SRA)
  2. Dynamic Resource Allocation (DRA)

1. Static Resource Allocation

In static resource allocation, the resources are pre-allocated to the Spark application before it starts running. The amount of resources is fixed and cannot be changed during runtime. It means that if the Spark application requires more resources than what was allocated, it will result in longer execution times or even failure of the job.

Static resource allocation is suitable for scenarios where the resource requirements of the Spark application are known in advance and the workload is consistent throughout the job.

Disadvantages of Static Resource Allocation

  1. Inefficient Resource Utilization: Static resource allocation mode may lead to inefficient resource utilization if the allocated resources are not fully utilized by the Spark application. This can result in idle resources and suboptimal performance.

  2. Limited Flexibility: Static allocation mode does not allow the Spark application to adjust the allocated resources during runtime based on the workload. This can result in out-of-memory errors or insufficient resources when the workload increases.

2. Dynamic Resource Allocation

In dynamic resource allocation, the resources are allocated to the Spark application on an as-needed basis during runtime. The allocation is adjusted dynamically based on the workload and usage patterns of the application. This allows for better resource utilization and can help avoid underutilization or overutilization of resources.

 

Dynamic resource allocation is suitable for scenarios where the resource requirements of the Spark application are unknown or variable, and the workload is not consistent throughout the job.

Dynamic Resource Allocation (DRA)

 

Dynamic allocation is a feature in Apache Spark that allows for automatic adjustment of the number of executors allocated to an application. This feature is particularly useful for applications that have varying workloads and need to scale up or down depending on the amount of data being processed. It can help optimize the use of cluster resources and improve application performance.

 

When dynamic allocation is enabled, Spark can dynamically allocate and deallocate executor nodes based on the application workload. If the workload increases, Spark can automatically allocate additional executor nodes to handle the additional load. Similarly, if the workload decreases, Spark can deallocate executor nodes to free up resources.

 

Note: 

  • In the Cloudera Data Platform (CDP), dynamic allocation is enabled by default.
  • Dynamic allocation is available for all the supported cluster managers i.e. Spark Standalone, Hadoop YARN, Apache Mesos, and Kubernetes.

Advantages of Dynamic Allocation:

  1. Resource efficiency: Dynamic allocation enables Spark to allocate resources (CPU, memory, etc.) to the application based on the actual workload, which can help reduce the waste of resources and improve the overall efficiency of the cluster.

  2. Scalability: Dynamic allocation allows Spark to scale up or down the number of executors allocated to an application depending on the workload. This enables the application to handle spikes in traffic or data volumes without requiring manual intervention.

  3. Cost savings: By efficiently allocating resources, dynamic allocation can help reduce the overall cost of running Spark applications.

  4. Fairness: Dynamic allocation can help ensure that multiple applications running on the same cluster are allocated resources fairly based on their actual workload.

Disadvantages of Dynamic Allocation:

  1. Overhead: Dynamic allocation requires Spark to continuously monitor the workload and adjust the allocation of resources accordingly, which can create additional overhead and potentially impact the performance of the application.

  2. Latency: Because dynamic allocation involves monitoring and adjusting the allocation of resources, there may be a latency cost associated with switching the number of executors allocated to an application.

  3. Configuration complexity: Enabling dynamic allocation requires configuring several properties in Spark, which can increase the complexity of managing and deploying Spark applications.

  4. Unpredictability: Dynamic allocation can be unpredictable, and the number of executors allocated to an application may change frequently, which can make it difficult to monitor and optimize the application's performance.

  5. Increased network traffic: Dynamic allocation can increase network traffic in the cluster, as Spark needs to communicate with the Node Manager to request and release resources. This can result in increased overhead and impact the overall performance of the application.
  6. Yarn Node Manager Auxiliary Spark Shuffle Service Overload: When using Spark Dynamic Resource Allocation, the Yarn Node Manager Auxiliary Service responsible for handling the shuffle process can become overloaded if the allocated resources are insufficient for the Spark application's workload. This can result in reduced performance and instability in the cluster. It is essential to carefully monitor the resource allocation and adjust the configuration parameters to ensure that the Yarn Node Manager Auxiliary Spark Shuffle Service has enough resources to handle the workload efficiently.

Resource Allocation Policy of scaling executors up and down:

Dynamic Allocation comes with the policy of scaling executors up and down as follows:

a) Scale-Up Policy

Scale Up Policy requests new executors when there are pending tasks and increases the number of executors exponentially since executors start slow and the Spark application may need slightly more. The actual request is triggered when there have been pending tasks for spark.dynamicAllocation.schedulerBacklogTimeout seconds, and then triggered again every spark.dynamicAllocation.sustainedSchedulerBacklogTimeout seconds thereafter if the queue of pending tasks persists. Additionally, the number of executors requested in each round increases exponentially from the previous round. For instance, an application will add 1 executor in the first round, and then 2, 4, 8 and so on executors in the subsequent rounds.

 

b) Scale Down Policy

Scale Down Policy removes executors that have been idle for spark.dynamicAllocation.executorIdleTimeout seconds. Note that, under most circumstances, this condition is mutually exclusive with the request condition, in that an executor should not be idle if there are still pending tasks to be scheduled. 

Different kinds of Spark Applications:

There are several kinds of Spark applications that can be developed and run on a Spark cluster.

  1. Batch Applications
  2. Streaming Applications
  3. Structured Streaming Applications

Let's discuss how to enable dynamic allocation for each kind of application.

1. Enable Dynamic Allocation for Spark Batch Applications

To enable dynamic allocation for Spark batch applications, you need to use the following configuration properties:

 

Property Name Default Value Description
spark.shuffle.service.enabled false Enables the external shuffle service.
spark.dynamicAllocation.enabled false Set this to true to enable dynamic allocation.
spark.dynamicAllocation.minExecutors 0 Set this to the minimum number of executors that should be allocated to the application.
spark.dynamicAllocation.initialExecutors spark.dynamicAllocation.minExecutors The initial number of executors to run if dynamic allocation is enabled.

If `--num-executors` (or `spark.executor.instances`) is set and larger than this value, it will be used as the initial number of executors.
spark.dynamicAllocation.maxExecutors infinity Set this to the maximum number of executors that should be allocated to the application.

 

In addition to the above parameters, there are some additional parameters available. In most of the scenarios, the additional parameter default values are sufficient. 

 

spark-submit command:

 

spark-submit \
--conf spark.shuffle.service.enabled=true \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.dynamicAllocation.minExecutors=0 \
--conf spark.dynamicAllocation.initialExecutors=1 \
--conf spark.dynamicAllocation.maxExecutors=20 

 

Note: In the above spark-submit command, the minimum number of executors is 0 and the maximum number of executors is 20. Based on requirement we need to adjust the above parameter values.

2. Enable Dynamic Allocation for Spark Streaming Applications

 

To enable the dynamic allocation for streaming applications, mainly we need to use the following parameters.

 

Parameter Default Value Description
spark.streaming.dynamicAllocation.enabled false Set this to true to enable dynamic allocation for Spark Streaming applications.
spark.streaming.dynamicAllocation.minExecutors  

Set this to the minimum number of executors that should be allocated to the application. Minimum Executors value must be > 0.

spark.streaming.dynamicAllocation.maxExecutors infinity

Set this to the maximum number of executors that should be allocated to the application.

 

Note: Dynamic Allocation cannot be enabled for both streaming and batch at the same time. When we are running streaming applications first we need to disable the batch dynamic allocation.

spark-submit command:

 

spark-submit \
--conf spark.shuffle.service.enabled=true \
--conf spark.dynamicAllocation.enabled=false \
--conf spark.streaming.dynamicAllocation.enabled=true \
--conf spark.streaming.dynamicAllocation.minExecutors=1 \
--conf spark.streaming.dynamicAllocation.maxExecutors=20 

 

Note: In the above Spark-submit command, the minimum number of executors is 1, and the maximum executors is 20. 

3. Enable Dynamic Allocation for Spark Structured Streaming Applications

 

Unlike Spark Streaming, Spark Structured Streaming does not have separate spark parameters to enable the dynamic allocation and we need to use the Batch parameter to enable the Dynamic Allocation. There are an open SPARK-24815 Jira to pass the separate parameters for Spark Structured Streaming. 

 

spark-submit command:

 

spark-submit \
--conf spark.shuffle.service.enabled=true \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.dynamicAllocation.minExecutors=0 \
--conf spark.dynamicAllocation.initialExecutors=1 \
--conf spark.dynamicAllocation.maxExecutors=20 

 

Note: For Structured Streaming applications, using batch allocation parameters are not efficient way to allocate the resources because it is not designed for streaming job patterns and works poorly for few types of applications. 

References:

 

  1. https://spark.apache.org/docs/latest/configuration.html#dynamic-allocation
  2. https://docs.cloudera.com/runtime/latest/running-spark-applications/topics/spark-yarn-dynamic-alloca... 
  3. https://issues.apache.org/jira/browse/SPARK-12133 
  4. https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/sche...
  5. https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/internal/config/Str...
  6. https://issues.apache.org/jira/browse/SPARK-24815
  7. https://issues.apache.org/jira/browse/SPARK-3174
  8. https://issues.apache.org/jira/browse/SPARK-4922
  9. https://www.slideshare.net/databricks/dynamic-allocation-in-spark

 

33,877 Views