Created on 06-09-2021 08:56 PM - edited on 01-25-2024 01:21 AM by VidyaSargur
Spark is an in-memory processing engine where all of the computation that a task does happens in memory. So, it is important to understand Spark Memory Management. This will help us develop Spark applications and perform performance tuning.
If the memory allocation is too large when committing, it will occupy resources. If the memory allocation is too small, memory overflow and full-GC problems will occur easily.
Efficient memory use is critical for good performance, but the reverse is also true: inefficient memory use leads to bad performance.
Spark Architecture:
The Spark application includes two JVM processes: driver and executor.
Driver's memory management is relatively simple; Spark does not make specific plans.
In this article, we can analyze executor memory management.
The executor acts as a JVM process launched on a worker node. So, it is important to understand JVM memory management.
JVM memory management is categorized into two types:
In general, the objects' read and write speed is:
on-heap > off-heap > disk
Spark memory management is divided into two types:
Since Spark 1.6.0, Unified Memory Manager has been set as the default memory manager for Spark.
Static Memory Manager has been deprecated because of its lack of flexibility.
In both memory managers, a portion of Java Heap is located for processing Spark applications, while the rest of the memory is reserved for Java class references and metadata usage.
Note: There will only be one MemoryManager per JVM.
// Determine whether to use the old memory management mode
val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)
val memoryManager: MemoryManager =
if (useLegacyMemoryManager) {
// The old version uses static memory management
new StaticMemoryManager(conf, numUsableCores)
} else {
// The new version uses unified memory management
UnifiedMemoryManager(conf, numUsableCores)
}
To store the information about the memory, both memory managers will use two memory pools, i.e.
From Spark 1.0, May 2014
Static Memory Manager (SMM) is the traditional model and simple scheme for memory management.
It divides memory into two fixed partitions statically.
The size of storage memory, execution memory, and other memory is fixed during application processing, but users can configure it before the application starts.
Note: The Memory allocation method has been eliminated in Spark 3.0
Advantage:
Disadvantage:
In Spark 1.6+, static memory management can be enabled via spark.memory. useLegacyMode=true parameter.
Parameter | Description |
spark.memory.useLegacyMode (default fasle) | The option to divide heap space into fixed-size regions |
spark.shuffle.memoryFraction (default 0.2) | The fraction of the heap used for aggregation and cogrouping during shuffles. It works only if spark.memory.useLegacyMode=true |
spark.storage.memoryFraction (default 0.6) | The fraction of the heap used for Spark’s memory cache. It works only if spark.memory.useLegacyMode=true |
spark.storage.unrollFraction (default 0.2) | The fraction of spark.storage.memoryFractions are used for unrolling blocks in memory. This is dynamically allocated by dropping existing blocks when there is not enough free storage space to unroll the new block in its entirety. It works only if spark.memory.useLegacyMode is true. |
Static memory management does not support the use of off-heap memory for storage, so all of it is allocated to the execution space.
From Spark 1.6+, January 2016
Since Spark 1.6.0, a new memory manager has been adopted to replace the static memory manager and provide Spark with dynamic memory allocation.
It allocates a region of memory as a unified memory container that is shared by storage and execution.
When execution memory is not used, the storage memory can acquire all the available memory, and vice versa.
If any of the storage or execution memory needs more space, a function called acquireMemory() will expand one of the memory pools and shrink another one.
Borrowed storage memory can be evicted at any given time. Borrowed execution memory, however, will not be evicted in the first design due to complexities in implementation.
Advantages:
JVM has two types of memory:
In addition to the above two JVM memory types, there is one more segment of memory that is accessed by Spark, i.e., external process memory. This kind of memory is mainly used for PySpark and SparkR applications. This is the memory used by the Python/R process, which resides outside the JVM.
By default, Spark uses on-heap memory only. The size of the on-heap memory is configured by the --executor-memory or spark.executor.memory parameter when the Spark application starts.
The concurrent tasks running inside Executor share the JVM's on-heap memory.
Two main configurations control executor memory allocation:
Parameter | Description |
spark.memory.fraction (default 0.6) | Fraction of the heap space used for execution and storage. The lower this is, the more frequently spills and cached data evictions occur. The purpose of this configuration is to set aside memory for internal metadata, user data structures, and imprecise size estimation in the case of sparse, unusually large records. |
spark.memory.storageFraction (default 0.5) | The size of the storage region within the space set aside by spark.memory.fraction. Cached data may only be evicted if total storage exceeds this region. |
Note: In Spark 1.6, the spark.memory.fraction value will be 0.75, and the spark.memory.storageFraction value will be 0.5.
Apache Spark supports three memory regions:
Reserved memory is the memory reserved for the system and is used to store Spark's internal objects.
As of Spark v1.6.0+, the value is 300MB. That means 300MB of RAM does not participate in Spark memory region size calculations (SPARK-12081).
Reserved memory’s size is hard coded, and its size cannot be changed in any way without Spark recompilation or setting spark.testing.reservedMemory, which is not recommended as it is a testing parameter not intended to be used in production.
Formula:
RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024 BYTES = 300 MB
Note: If the executor memory is less than 1.5 times the reserved memory (1.5 * reserved memory = 450MB), then the Spark job will fail with the following exception message:
spark-shell --executor-memory 300m
21/06/21 03:55:51 ERROR repl.Main: Failed to initialize Spark session.
java.lang.IllegalArgumentException: Executor memory 314572800 must be at least 471859200. Please increase executor memory using the --executor-memory option or spark.executor.memory in Spark configuration.
at org.apache.spark.memory.UnifiedMemoryManager$.getMaxMemory(UnifiedMemoryManager.scala:225)
at org.apache.spark.memory.UnifiedMemoryManager$.apply(UnifiedMemoryManager.scala:199)
User Memory is the memory used to store user-defined data structures, Spark internal metadata, any UDFs created by the user, and the data needed for RDD conversion operations, such as RDD dependency information, etc.
For example, we can rewrite Spark aggregation by using the mapPartitions transformation to maintain a hash table for this aggregation to run, which would consume so-called user memory.
This memory segment is not managed by Spark. Spark will not be aware of or maintain this memory segment.
Formula:
(Java Heap — Reserved Memory) * (1.0—Spark.memory.fraction)
Spark Memory is the memory pool managed by Apache Spark. Spark Memory is responsible for storing intermediate states while doing task execution like joins or storing broadcast variables.
All the cached or persistent data will be stored in this segment, specifically in the storage memory of this segment.
Formula:
(Java Heap — Reserved Memory) * spark.memory.fraction
Spark tasks operate in two main memory regions:
The boundary between them is set by spark.memory.storageFraction parameter, which defaults to 0.5 or 50%.
Storage Memory is used for storing all of the cached data, broadcast variables, unrolled data, etc. “Unroll” is essentially a process of deserializing serialized data.
Any persistent option that includes memory in it will store that data in this segment.
Spark clears space for new cache requests by removing old cached objects based on the Least Recently Used (LRU) mechanism.
Once the cached data is out of storage, it is either written to disk or recomputed based on configuration. Broadcast variables are stored in the cache at the MEMORY_AND_DISK persistent level. This is where we store cached data, which is long-lived.
Formula:
(Java Heap — Reserved Memory) * spark.memory.fraction * spark.memory.storageFraction
Execution Memory is used for storing the objects required during the execution of Spark tasks.
For example, it is used to store the shuffle intermediate buffer on the map side in memory. Also, it is used to store a hash table for the hash aggregation step.
This pool also supports spilling on disk if not enough memory is available, but the blocks from this pool cannot be forcefully evicted by other threads (tasks).
Execution memory tends to be more short-lived than storage. It is evicted immediately after each operation, making space for the next ones.
Formula:
(Java Heap — Reserved Memory) * spark.memory.fraction * (1.0 - spark.memory.storageFraction)
In Spark 1.6+, there is no hard boundary between execution memory and storage memory.
Due to the nature of execution memory, blocks cannot be forcefully evicted from this pool; otherwise, execution will break since the block it refers to won’t be found.
But when it comes to storage memory, blocks can be evicted from memory and written to disk or recomputed (if the persistence level is MEMORY_ONLY) as required.
Storage and execution pool borrowing rules:
To calculate reserved memory, user memory, spark memory, storage memory, and execution memory, we will use the following parameters:
spark.executor.memory=5g
spark.memory.fraction=0.6
spark.memory.storageFraction=0.5
Java Heap Memory = 5 GB
= 5 * 1024 MB
= 5120 MB
Reserved Memory = 300 MB
Usable Memory = (Java Heap Memory — Reserved Memory)
= 5120 MB - 300 MB
= 4820 MB
User Memory = Usable Memory * (1.0 * spark.memory.fraction)
= 4820 MB * (1.0 - 0.6)
= 4820 MB * 0.4
= 1928 MB
Spark Memory = Usable Memory * spark.memory.fraction
= 4820 MB * 0.6
= 2892 MB
Spark Storage Memory = Spark Memory * Spark.memory.storageFraction
= 2892 MB * 0.5
= 1446 MB
Spark Execution Memory = Spark Memory * (1.0 - spark.memory.storageFraction)
= 2892 MB * ( 1 - 0.5)
= 2892 MB * 0.5
= 1446 MB
Reserved Memory — 300 MB — 5.85%
User Memory — 1928 MB — 37.65%
Spark Memory — 2892 MB — 56.48%
Off-Heap memory means allocating memory objects (serialized to byte array) to memory outside the heap of the Java virtual machine(JVM), which is directly managed by the operating system (not the virtual machine), but stored outside the process heap in native memory (therefore, they are not processed by the garbage collector).
The result of this is to keep a smaller heap to reduce the impact of garbage collection on the application.
Accessing this data is slightly slower than accessing the on-heap storage, but still faster than reading/writing from a disk. The downside is that the user has to manually deal with managing the allocated memory.
This model does not apply within the JVM memory, but calls the Java API for the unsafe related language, such as C, inside malloc () directly to the operating system for memory. Since this method is not been to the JVM memory management, so avoid frequent GC. The disadvantage of this application is that memory must write its own logic and memory applications release.
Spark 1.6+ began to introduce Off-heap memory (SPARK-11389). Unified Memory Manager can optionally be allocated using off-heap memory.
Parameter |
Description |
spark.memory.offHeap.enabled (default false) |
The option to use off-heap memory for certain operations |
spark.memory.offHeap.size (default 0) |
The total amount of memory in bytes for off-heap allocation. It has no impact on heap memory usage, so make sure not to exceed your executor’s total limits. |
By default, Off-heap memory is disabled, but we can enable it by the spark.memory.offHeap.enabled (false by default) parameter, and set the memory size by spark.memory.offHeap.size (0 by default) parameter.
spark-shell \
--conf spark.memory.offHeap.enabled=true \
--conf spark.memory.offHeap.size=5g
Off-heap memory supports OFF_HEAP persistence level. Compared to the on-heap memory, the model of the off-heap memory is relatively simple, including only Storage Memory and Execution Memory.
If the off-heap memory is enabled, there will be both on-heap and off-heap memory in the Executor.
Spark UI with OffHeap Enabled
The Execution Memory in the Executor is the sum of the Execution memory inside the heap and the Execution memory outside the heap. The same is true for Storage Memory.
Note: Project Techyon supports in-memory storage for Spark RDDs in off-heap space.
Project Tungsten supports storing shuffle objects in off-heap space.
Advantage(s):
When an executor is killed, all cached data for that executor would be gone but with off-heap memory, the data would persist. The lifetime of JVM and the lifetime of cached data are decoupled.
Disadvantage(s):
Let's launch the Spark shell with 5GB On Heap Memory to understand the Storage Memory in Spark UI.
spark-shell \
--driver-memory 5g \
--executor-memory 5g
Let's see the available Storage Memory displayed on the Spark UI Executor tab is 2.7 GB, as follows:
Based on our 5GB calculation, we can see the following memory values:
Java Heap Memory = 5 GB
Reserved Memory = 300 MB
Usable Memory = 4820 MB
User Memory = 1928 MB
Spark Memory = 2892 MB = 2.8242 GB
Spark Storage Memory = 1446 MB = 1.4121 GB
Spark Execution Memory = 1446 MB = 1.4121 GB
From Spark UI, the Storage Memory value is 2.7 GB and from our calculation, the Storage Memory value is 1.4121 GB. Both Storage Memory values are not matched because from Spark UI Storage Memory value is a sum of Storage Memory and Execution Memory.
Storage Memory = Spark Storage Memory + Spark Execution Memory
= 1.4121 GB + 1.4121 GB
= 2.8242 GB
We can see still Spark UI Storage Memory (2.7 GB) is still not matched with the above memory calculation Storage Memory (2.8242 GB) because we set --executor-memory as 5g. The memory obtained by Spark's Executor through Runtime.getRuntime.maxMemory is 4772593664 bytes, so Java Heap Memory is only 4772593664 bytes.
Java Heap Memory = 4772593664 bytes = 4772593664/(1024 * 1024) = 4551 MB
Reserved Memory = 300 MB
Usable Memory = (Java Heap Memory - Reserved Memory) = (4551 - 300) MB = 4251 MB
User Memory = (Usable Memory * (1 -spark.memory.fraction)) = 1700.4 MB
Spark Memory = (Usable Memory * spark.memory.fraction) = 2550.6 MB
Spark Storage Memory = 1275.3 MB
Spark Execution Memory = 1275.3 MB
Spark Memory (2550.6 MB/2.4908 GB) still does not match what is displayed on the Spark UI (2.7 GB) because while converting Java Heap Memory bytes into MB we used 1024 * 1024 but in Spark UI converts bytes by dividing by 1000 * 1000.
Java Heap Memory = 4772593664 bytes = 4772593664/(1000 * 1000) = 4772.593664 MB
Reserved Memory = 300 MB
Usable Memory = (Java Heap Memory - Reserved Memory) = (4472.593664 - 300) MB = 4472.593664 MB
User Memory = (Usable Memory * (1 -spark.memory.fraction)) = 1789.0374656 MB
Spark Memory = (Usable Memory * spark.memory.fraction) = 2683.5561984 MB = ~ 2.7 GB
Spark Storage Memory = 1341.7780992 MB
Spark Execution Memory = 1341.7780992 MB
Logic for converting bytes into GB:
function formatBytes(bytes, type) {
if (type !== 'display') return bytes;
if (bytes == 0) return '0.0 B';
var k = 1000;
var dm = 1;
var sizes = ['B', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB'];
var i = Math.floor(Math.log(bytes) / Math.log(k));
return parseFloat((bytes / Math.pow(k, i)).toFixed(dm)) + ' ' + sizes[i];
}
function formatBytes(bytes, type) {
if (type !== 'display') return bytes;
if (bytes <= 0) return '0.0 B';
var k = 1024;
var dm = 1;
var sizes = ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB'];
var i = Math.floor(Math.log(bytes) / Math.log(k));
return parseFloat((bytes / Math.pow(k, i)).toFixed(dm)) + ' ' + sizes[i];
}
Let's launch the spark shell with 1GB On Heap memory and 5GB Off Heap memory to understand the Storage Memory.
spark-shell \
--driver-memory 1g \
--executor-memory 1g \
--conf spark.memory.offHeap.enabled=true \
--conf spark.memory.offHeap.size=5g
Let's see the available Storage Memory displayed on the Spark UI Executor tab is 5.8 GB, as follows:
Storage Memory = On Heap Memory + Off Heap Memory
Java Heap Memory = 954728448 bytes = 954728448/1000/1000 = 954 MB
Reserved Memory = 300 MB
Usable Memory = (Java Heap Memory - Reserved Memory) = (954 - 300) MB = 654 MB
User Memory = (Usable Memory * (1 -spark.memory.fraction)) = 261.6 MB
Spark Memory = (Usable Memory * spark.memory.fraction) = 392.4 MB
Spark Storage Memory = 196.2 MB
Spark Execution Memory = 196.2 MB
spark.memory.offHeap.size = 5 GB = 5 * 1000 MB = 5000 MB
Storage Memory = On Heap Memory + Off Heap Memory
= 392.4 MB + 5000 MB
= 5392.4 MB
= 5.4 GB
Simple Java Program to calculate the Spark Memory:
// JVM Arguments: -Xmx5g
public class SparkMemoryCalculation {
private static final long MB = 1024 * 1024;
private static final long RESERVED_SYSTEM_MEMORY_BYTES = 300 * MB;
private static final double SparkMemoryStorageFraction = 0.5;
private static final double SparkMemoryFraction = 0.6;
public static void main(String[] args) {
long systemMemory = Runtime.getRuntime().maxMemory();
long usableMemory = systemMemory - RESERVED_SYSTEM_MEMORY_BYTES;
long sparkMemory = convertDoubletLong(usableMemory * SparkMemoryFraction);
long userMemory = convertDoubletLong(usableMemory * (1 - SparkMemoryFraction));
long storageMemory = convertDoubletLong(sparkMemory * SparkMemoryStorageFraction);
long executionMemory = convertDoubletLong(sparkMemory * (1 - SparkMemoryStorageFraction));
printMemoryInMB("Heap Memory\t\t", systemMemory);
printMemoryInMB("Reserved Memory", RESERVED_SYSTEM_MEMORY_BYTES);
printMemoryInMB("Usable Memory\t", usableMemory);
printMemoryInMB("User Memory\t\t", userMemory);
printMemoryInMB("Spark Memory\t", sparkMemory);
printMemoryInMB("Storage Memory\t", storageMemory);
printMemoryInMB("Execution Memory", executionMemory);
System.out.println();
printStorageMemoryInMB("Spark Storage Memory", sparkMemory);
printStorageMemoryInMB("Storage Memory UI \t", storageMemory);
printStorageMemoryInMB("Execution Memory UI", executionMemory);
}
private static void printMemoryInMB(String type, long memory) {
System.out.println(type + " \t=\t"+ (memory/MB) +" MB");
}
private static void printStorageMemoryInMB(String type, long memory) {
System.out.println(type + " \t=\t"+ (memory/(1000*1000)) +" MB");
}
private static Long convertDoubletLong(double val) {
return new Double(val).longValue();
}
}
Thanks for visiting this article. If you liked this article, give kudos.
Created on 12-07-2021 10:09 PM
Thank Ranga for such great explanation and your efforts for this content delivery
Created on 04-06-2022 11:51 PM
Thank You @RangaReddy for this detailed write-up. The level of detailing is awesome 👏
Created on 07-01-2022 08:00 PM
Thank you @RangaReddy It's an excellent article.
Created on 08-07-2024 06:37 AM
Thank you...