Community Articles

Find and share helpful community-sourced technical articles.
Announcements
Celebrating as our community reaches 100,000 members! Thank you!
Labels (1)
avatar
Master Collaborator

1. Introduction

Spark is an in-memory processing engine where all of the computation that a task does happens in memory. So, it is important to understand Spark Memory Management. This will help us develop Spark applications and perform performance tuning.

If the memory allocation is too large when committing, it will occupy resources. If the memory allocation is too small, memory overflow and full-GC problems will occur easily.

Efficient memory use is critical for good performance, but the reverse is also true: inefficient memory use leads to bad performance.

Spark Architecture:

cluster-overview

 

The Spark application includes two JVM processes: driver and executor.

  • The driver is the main control process, which is responsible for creating the SparkSession/SparkContext, submitting the job, converting the job to a task, and coordinating the task execution between executors.
  • The executor is mainly responsible for performing specific calculation tasks and returning the results to the driver.

Driver's memory management is relatively simple; Spark does not make specific plans.

In this article, we can analyze executor memory management.

2. Executor memory

The executor acts as a JVM process launched on a worker node. So, it is important to understand JVM memory management.

JVM memory management is categorized into two types:

  1. On-Heap Memory Management (In-Heap Memory): Objects are allocated on the JVM Heap and bound by GC.
  2. Off-Heap Memory Management (External Memory): Objects are allocated in memory outside the JVM by serialization, managed by the application, and are not bound by GC.

JVM Memory Types.jpg

In general, the objects' read and write speed is:

on-heap > off-heap > disk

3. Memory Management

Spark memory management is divided into two types:

  • Static Memory Manager (Static Memory Management), and
  • Unified Memory Manager (Unified memory management)

MemoryManagerTypes copy.jpg

 

Since Spark 1.6.0, Unified Memory Manager has been set as the default memory manager for Spark.

Static Memory Manager has been deprecated because of its lack of flexibility.

In both memory managers, a portion of Java Heap is located for processing Spark applications, while the rest of the memory is reserved for Java class references and metadata usage.

Note: There will only be one MemoryManager per JVM.

 

// Determine whether to use the old memory management mode
val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)

val memoryManager: MemoryManager =
if (useLegacyMemoryManager) {
        // The old version uses static memory management
        new StaticMemoryManager(conf, numUsableCores)
} else {
        // The new version uses unified memory management
        UnifiedMemoryManager(conf, numUsableCores)
}

 

To store the information about the memory, both memory managers will use two memory pools, i.e.

  • ExecutionMemoryPool and
  • StorageMemoryPool

4. Static Memory Manager (SMM)

From Spark 1.0, May 2014

Static Memory Manager (SMM) is the traditional model and simple scheme for memory management.

It divides memory into two fixed partitions statically.

The size of storage memory, execution memory, and other memory is fixed during application processing, but users can configure it before the application starts.

Note: The Memory allocation method has been eliminated in Spark 3.0

Advantage:

  • The static Manager mechanism is simple to implement

Disadvantage: 

  • Even though space is available with storage memory, we can’t use it, and there is a disk spill since executor memory is full. (vice versa).

In Spark 1.6+, static memory management can be enabled via spark.memory. useLegacyMode=true parameter.

Parameter Description
spark.memory.useLegacyMode (default fasle) The option to divide heap space into fixed-size regions
spark.shuffle.memoryFraction (default 0.2) The fraction of the heap used for aggregation and cogrouping during shuffles. It works only if spark.memory.useLegacyMode=true
spark.storage.memoryFraction (default 0.6) The fraction of the heap used for Spark’s memory cache. It works only if spark.memory.useLegacyMode=true
spark.storage.unrollFraction (default 0.2) The fraction of spark.storage.memoryFractions are used for unrolling blocks in memory. This is dynamically allocated by dropping existing blocks when there is not enough free storage space to unroll the new block in its entirety. It works only if spark.memory.useLegacyMode is true.

 

Static memory management does not support the use of off-heap memory for storage, so all of it is allocated to the execution space.

5. Unified Memory Manager (UMM)

From Spark 1.6+, January 2016

Since Spark 1.6.0, a new memory manager has been adopted to replace the static memory manager and provide Spark with dynamic memory allocation.

It allocates a region of memory as a unified memory container that is shared by storage and execution.

When execution memory is not used, the storage memory can acquire all the available memory, and vice versa.

If any of the storage or execution memory needs more space, a function called acquireMemory() will expand one of the memory pools and shrink another one.

Borrowed storage memory can be evicted at any given time. Borrowed execution memory, however, will not be evicted in the first design due to complexities in implementation.

Advantages:

  1. The boundary between storage memory and execution memory is not static, and in cases of memory pressure, the boundary would be moved, i.e., one region would grow by borrowing space from another one.
  2. When the application has no cache and is propagating, execution uses all the memory to avoid unnecessary disk overflow.
  3. When the application has a cache, it will reserve the minimum storage memory so that the data block is not affected.
  4. This approach provides reasonable out-of-the-box performance for a variety of workloads without requiring user expertise in how memory is divided internally.

JVM has two types of memory:

  1. On-Heap Memory
  2. Off-Heap Memory

In addition to the above two JVM memory types, there is one more segment of memory that is accessed by Spark, i.e., external process memory. This kind of memory is mainly used for PySpark and SparkR applications. This is the memory used by the Python/R process, which resides outside the JVM.

5.1 On-Heap Memory

By default, Spark uses on-heap memory only. The size of the on-heap memory is configured by the --executor-memory or spark.executor.memory parameter when the Spark application starts.

The concurrent tasks running inside Executor share the JVM's on-heap memory.

Two main configurations control executor memory allocation:

Parameter Description
spark.memory.fraction (default 0.6) Fraction of the heap space used for execution and storage. The lower this is, the more frequently spills and cached data evictions occur. The purpose of this configuration is to set aside memory for internal metadata, user data structures, and imprecise size estimation in the case of sparse, unusually large records.
spark.memory.storageFraction (default 0.5) The size of the storage region within the space set aside by spark.memory.fraction. Cached data may only be evicted if total storage exceeds this region.

Note: In Spark 1.6, the spark.memory.fraction value will be 0.75, and the spark.memory.storageFraction value will be 0.5.

Apache Spark supports three memory regions:

  • Reserved Memory
  • User Memory
  • Spark Memory

Cd_Unified_Memory_Manager_Regions.jpg

Reserved Memory:

Reserved memory is the memory reserved for the system and is used to store Spark's internal objects.

As of Spark v1.6.0+, the value is 300MB. That means 300MB of RAM does not participate in Spark memory region size calculations (SPARK-12081).

Reserved memory’s size is hard coded, and its size cannot be changed in any way without Spark recompilation or setting spark.testing.reservedMemory, which is not recommended as it is a testing parameter not intended to be used in production.

Formula:

RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024 BYTES = 300 MB

UnifiedMemoryManager.scala

Note: If the executor memory is less than 1.5 times the reserved memory (1.5 * reserved memory = 450MB), then the Spark job will fail with the following exception message:

 

spark-shell --executor-memory 300m
21/06/21 03:55:51 ERROR repl.Main: Failed to initialize Spark session.
java.lang.IllegalArgumentException: Executor memory 314572800 must be at least 471859200. Please increase executor memory using the --executor-memory option or spark.executor.memory in Spark configuration.
        at org.apache.spark.memory.UnifiedMemoryManager$.getMaxMemory(UnifiedMemoryManager.scala:225)
        at org.apache.spark.memory.UnifiedMemoryManager$.apply(UnifiedMemoryManager.scala:199)

 

UnifiedMemoryManager.scala

User Memory:

User Memory is the memory used to store user-defined data structures, Spark internal metadata, any UDFs created by the user, and the data needed for RDD conversion operations, such as RDD dependency information, etc.

For example, we can rewrite Spark aggregation by using the mapPartitions transformation to maintain a hash table for this aggregation to run, which would consume so-called user memory.

This memory segment is not managed by Spark. Spark will not be aware of or maintain this memory segment.

Formula:

(Java Heap — Reserved Memory) * (1.0—Spark.memory.fraction)

Spark Memory (Unified Memory):

Spark Memory is the memory pool managed by Apache Spark. Spark Memory is responsible for storing intermediate states while doing task execution like joins or storing broadcast variables.

All the cached or persistent data will be stored in this segment, specifically in the storage memory of this segment.

Formula:

(Java Heap — Reserved Memory) * spark.memory.fraction

Spark tasks operate in two main memory regions:

  • Execution: Used for shuffles, joins, sorts, and aggregations.
  • Storage: used to cache partitions of data.

The boundary between them is set by spark.memory.storageFraction parameter, which defaults to 0.5 or 50%.

Storage Memory:

Storage Memory is used for storing all of the cached data, broadcast variables, unrolled data, etc. “Unroll” is essentially a process of deserializing serialized data.

Any persistent option that includes memory in it will store that data in this segment.

Spark clears space for new cache requests by removing old cached objects based on the Least Recently Used (LRU) mechanism.

Once the cached data is out of storage, it is either written to disk or recomputed based on configuration. Broadcast variables are stored in the cache at the MEMORY_AND_DISK persistent level. This is where we store cached data, which is long-lived.

Formula:

(Java Heap — Reserved Memory) * spark.memory.fraction * spark.memory.storageFraction

Execution Memory:

Execution Memory is used for storing the objects required during the execution of Spark tasks.

For example, it is used to store the shuffle intermediate buffer on the map side in memory. Also, it is used to store a hash table for the hash aggregation step.

This pool also supports spilling on disk if not enough memory is available, but the blocks from this pool cannot be forcefully evicted by other threads (tasks).

Execution memory tends to be more short-lived than storage. It is evicted immediately after each operation, making space for the next ones.

Formula:

(Java Heap — Reserved Memory) * spark.memory.fraction * (1.0 - spark.memory.storageFraction)

In Spark 1.6+, there is no hard boundary between execution memory and storage memory.

Due to the nature of execution memory, blocks cannot be forcefully evicted from this pool; otherwise, execution will break since the block it refers to won’t be found.

But when it comes to storage memory, blocks can be evicted from memory and written to disk or recomputed (if the persistence level is MEMORY_ONLY) as required.

Storage and execution pool borrowing rules:

  1. Storage memory can borrow space from execution memory only if blocks are not used in execution memory.
  2. Execution memory can also borrow space from storage memory if blocks are not used in storage memory.
  3. If blocks from Execution memory is used by Storage memory and Execution needs more memory, it can forcefully evict the excess blocks occupied by Storage Memory
  4. If blocks from storage memory are used by execution memory and storage needs more memory, it cannot forcefully evict the excess blocks occupied by execution memory; it will have less memory area. It will wait until Spark releases the excess blocks stored in execution memory and then occupies them.

Calculate the memory for 5 GB of executor memory:

To calculate reserved memory, user memory, spark memory, storage memory, and execution memory, we will use the following parameters:

spark.executor.memory=5g
spark.memory.fraction=0.6
spark.memory.storageFraction=0.5
Java Heap Memory       = 5 GB
= 5 * 1024 MB
= 5120 MB

Reserved Memory = 300 MB

Usable Memory = (Java Heap Memory — Reserved Memory)
= 5120 MB - 300 MB
= 4820 MB

User Memory = Usable Memory * (1.0 * spark.memory.fraction)
= 4820 MB * (1.0 - 0.6)
= 4820 MB * 0.4 
= 1928 MB

Spark Memory = Usable Memory * spark.memory.fraction
                       = 4820 MB
* 0.6
                       = 2892 MB

Spark Storage Memory = Spark Memory * Spark.memory.storageFraction
                       = 2892 MB
* 0.5
                       = 1446 MB

Spark Execution Memory = Spark Memory * (1.0 - spark.memory.storageFraction)
= 2892 MB * ( 1 - 0.5)
                       = 2892 MB
* 0.5
                       = 1446 MB

Cd_Unified_Memory_Manager_5GB.jpg

Reserved Memory —  300 MB 	—	5.85%
User Memory 	— 1928 MB 	— 	37.65%
Spark Memory 	— 2892 MB 	—	56.48%

Off-Heap Memory (External memory)

Off-Heap memory means allocating memory objects (serialized to byte array) to memory outside the heap of the Java virtual machine(JVM), which is directly managed by the operating system (not the virtual machine), but stored outside the process heap in native memory (therefore, they are not processed by the garbage collector).

The result of this is to keep a smaller heap to reduce the impact of garbage collection on the application.

Accessing this data is slightly slower than accessing the on-heap storage, but still faster than reading/writing from a disk. The downside is that the user has to manually deal with managing the allocated memory.

This model does not apply within the JVM memory, but calls the Java API for the unsafe related language, such as C, inside malloc () directly to the operating system for memory. Since this method is not been to the JVM memory management, so avoid frequent GC. The disadvantage of this application is that memory must write its own logic and memory applications release.

Spark 1.6+ began to introduce Off-heap memory (SPARK-11389). Unified Memory Manager can optionally be allocated using off-heap memory.

Parameter

Description

spark.memory.offHeap.enabled (default false)

The option to use off-heap memory for certain operations

spark.memory.offHeap.size (default 0)

The total amount of memory in bytes for off-heap allocation. It has no impact on heap memory usage, so make sure not to exceed your executor’s total limits.

By default, Off-heap memory is disabled, but we can enable it by the spark.memory.offHeap.enabled (false by default) parameter, and set the memory size by spark.memory.offHeap.size (0 by default) parameter.

spark-shell \
--conf spark.memory.offHeap.enabled=true \
--conf spark.memory.offHeap.size=5g

Off-heap memory supports OFF_HEAP persistence level. Compared to the on-heap memory, the model of the off-heap memory is relatively simple, including only Storage Memory and Execution Memory.

Cd_OffHeap_Memory_Model.jpg

If the off-heap memory is enabled, there will be both on-heap and off-heap memory in the Executor.

Screenshot 2021-06-09 at 1.45.27 PM.png

 Spark UI with OffHeap Enabled

The Execution Memory in the Executor is the sum of the Execution memory inside the heap and the Execution memory outside the heap. The same is true for Storage Memory.

Note: Project Techyon supports in-memory storage for Spark RDDs in off-heap space.

Project Tungsten supports storing shuffle objects in off-heap space.

Advantage(s):

  • It can still reduce memory usage, reduce frequent GC, and improve program performance.
  • When an executor is killed, all cached data for that executor would be gone but with off-heap memory, the data would persist. The lifetime of JVM and the lifetime of cached data are decoupled.

Disadvantage(s):

  • Using OFF_HEAP does not back up data, nor can it guarantee high data availability like Alluxio does, and data loss requires recalculation.

6. Understand the Memory Allocation using Spark UI 

6.1 Using On Heap Memory:

Let's launch the Spark shell with 5GB On Heap Memory to understand the Storage Memory in Spark UI.

spark-shell \
--driver-memory 5g \
--executor-memory 5g

Let's see the available Storage Memory displayed on the Spark UI Executor tab is 2.7 GB, as follows:

Screenshot 2021-06-08 at 7.02.36 PM.png

Based on our 5GB calculation, we can see the following memory values:

Java Heap Memory       = 5 GB
Reserved Memory = 300 MB
Usable Memory = 4820 MB
User Memory   = 1928 MB
Spark Memory = 2892 MB = 2.8242 GB
Spark Storage Memory = 1446 MB = 1.4121 GB
Spark Execution Memory = 1446 MB = 1.4121 GB

From Spark UI, the Storage Memory value is 2.7 GB and from our calculation, the Storage Memory value is 1.4121 GB. Both Storage Memory values are not matched because from Spark UI Storage Memory value is a sum of Storage Memory and Execution Memory.

Storage Memory = Spark Storage Memory + Spark Execution Memory
= 1.4121 GB + 1.4121 GB
= 2.8242 GB

We can see still Spark UI Storage Memory (2.7 GB) is still not matched with the above memory calculation Storage Memory (2.8242 GB) because we set --executor-memory as 5g. The memory obtained by Spark's Executor through Runtime.getRuntime.maxMemory is 4772593664 bytes, so Java Heap Memory is only 4772593664 bytes.

Java Heap Memory       = 4772593664 bytes = 4772593664/(1024 * 1024) = 4551 MB 
Reserved Memory = 300 MB
Usable Memory = (Java Heap Memory - Reserved Memory) = (4551 - 300) MB = 4251 MB
User Memory   = (Usable Memory * (1 -spark.memory.fraction)) = 1700.4 MB
Spark Memory = (Usable Memory * spark.memory.fraction) = 2550.6 MB
Spark Storage Memory = 1275.3 MB
Spark Execution Memory = 1275.3 MB

Spark Memory (2550.6 MB/2.4908 GB) still does not match what is displayed on the Spark UI (2.7 GB) because while converting Java Heap Memory bytes into MB we used 1024 * 1024 but in Spark UI converts bytes by dividing by 1000 * 1000.

Java Heap Memory       = 4772593664 bytes = 4772593664/(1000 * 1000) = 4772.593664 MB 
Reserved Memory = 300 MB
Usable Memory = (Java Heap Memory - Reserved Memory) = (4472.593664 - 300) MB = 4472.593664 MB
User Memory   = (Usable Memory * (1 -spark.memory.fraction)) = 1789.0374656 MB
Spark Memory = (Usable Memory * spark.memory.fraction) = 2683.5561984 MB = ~ 2.7 GB
Spark Storage Memory = 1341.7780992 MB
Spark Execution Memory = 1341.7780992 MB

Logic for converting bytes into GB:

Spark 2.X

 

function formatBytes(bytes, type) {
    if (type !== 'display') return bytes;
    if (bytes == 0) return '0.0 B';
    var k = 1000;
    var dm = 1;
    var sizes = ['B', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB'];
    var i = Math.floor(Math.log(bytes) / Math.log(k));
    return parseFloat((bytes / Math.pow(k, i)).toFixed(dm)) + ' ' + sizes[i];
}

 

Spark 3.X

 

function formatBytes(bytes, type) {
  if (type !== 'display') return bytes;
  if (bytes <= 0) return '0.0 B';
  var k = 1024;
  var dm = 1;
  var sizes = ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB'];
  var i = Math.floor(Math.log(bytes) / Math.log(k));
  return parseFloat((bytes / Math.pow(k, i)).toFixed(dm)) + ' ' + sizes[i];
}

 

6.2 Using On Heap Memory + Off Heap Memory:

Let's launch the spark shell with 1GB On Heap memory and 5GB Off Heap memory to understand the Storage Memory.

spark-shell \
--driver-memory 1g \
--executor-memory 1g \
--conf spark.memory.offHeap.enabled=true \
--conf spark.memory.offHeap.size=5g

Let's see the available Storage Memory displayed on the Spark UI Executor tab is 5.8 GB, as follows:

Screenshot 2021-06-09 at 1.45.27 PM.png

 

Storage Memory = On Heap Memory + Off Heap Memory

 

Memory Calculation:

On Heap Memory

 

Java Heap Memory       = 954728448 bytes = 954728448/1000/1000 = 954 MB
Reserved Memory        = 300 MB
Usable Memory          = (Java Heap Memory - Reserved Memory) = (954 - 300) MB = 654 MB
User Memory            = (Usable Memory * (1 -spark.memory.fraction)) = 261.6 MB
Spark Memory           = (Usable Memory * spark.memory.fraction) = 392.4 MB
Spark Storage Memory   = 196.2 MB
Spark Execution Memory = 196.2 MB

 

Off Heap Memory

 

spark.memory.offHeap.size = 5 GB = 5 * 1000 MB = 5000 MB

 

Storage Memory

 

Storage Memory = On Heap Memory + Off Heap Memory
               = 392.4 MB + 5000 MB
               = 5392.4 MB
               = 5.4 GB

 

Simple Java Program to calculate the Spark Memory:

 

// JVM Arguments: -Xmx5g
public class SparkMemoryCalculation {

    private static final long MB = 1024 * 1024;
    private static final long RESERVED_SYSTEM_MEMORY_BYTES = 300 * MB;
    private static final double SparkMemoryStorageFraction = 0.5;
    private static final double SparkMemoryFraction = 0.6;

    public static void main(String[] args) {

        long systemMemory = Runtime.getRuntime().maxMemory();
        long usableMemory = systemMemory - RESERVED_SYSTEM_MEMORY_BYTES;
        long sparkMemory = convertDoubletLong(usableMemory * SparkMemoryFraction);
        long userMemory = convertDoubletLong(usableMemory * (1 - SparkMemoryFraction));

        long storageMemory = convertDoubletLong(sparkMemory * SparkMemoryStorageFraction);
        long executionMemory = convertDoubletLong(sparkMemory * (1 - SparkMemoryStorageFraction));

        printMemoryInMB("Heap Memory\t\t", systemMemory);
        printMemoryInMB("Reserved Memory", RESERVED_SYSTEM_MEMORY_BYTES);
        printMemoryInMB("Usable Memory\t", usableMemory);
        printMemoryInMB("User Memory\t\t", userMemory);
        printMemoryInMB("Spark Memory\t", sparkMemory);

        printMemoryInMB("Storage Memory\t", storageMemory);
        printMemoryInMB("Execution Memory", executionMemory);

        System.out.println();
        printStorageMemoryInMB("Spark Storage Memory", sparkMemory);
        printStorageMemoryInMB("Storage Memory UI \t", storageMemory);
        printStorageMemoryInMB("Execution Memory UI", executionMemory);
    }

    private static void printMemoryInMB(String type, long memory) {
        System.out.println(type + " \t=\t"+ (memory/MB) +" MB");
    }

    private static void printStorageMemoryInMB(String type, long memory) {
        System.out.println(type + " \t=\t"+ (memory/(1000*1000)) +" MB");
    }

    private static Long convertDoubletLong(double val) {
        return new Double(val).longValue();
    }
}

 

Thanks for visiting this article. If you liked this article, give kudos.

51,568 Views
Comments
avatar
Contributor

Thank Ranga for such great explanation and your efforts for this content delivery

avatar
Super Collaborator

Thank You @RangaReddy for this detailed write-up. The level of detailing is awesome 👏

avatar
Cloudera Employee

Thank you @RangaReddy  It's an excellent article.

avatar
Cloudera Employee

Thank you...