Created on 07-01-2016 02:19 PM - edited 08-17-2019 11:50 AM
Spark is notoriously knobby when it comes to tuning applications and requesting resources. While this added freedom gives us an incredible amount of control and flexibility when running our applications, it also gives us an opportunity to make a lot of mistakes. This guide is for Spark 1.5 and earlier. Spark 1.6 introduces a new setting that allows for a less rigid mapping of the memory areas.
There are a lot of knobs in spark to tune, knowing which ones do what will allow you to maximize your memory usage, and understand why you’re doing what you're doing.
Most people think when requesting 10g for a worker with Spark, your application has 10g of memory for caching, but its actually quite more complex than that. A Spark executor has several memory areas allocated, and knowing what these areas are for will help understand the best allocation for resources.
An executor has two regions, room for caching, and room for aggregations pre-shuffle. Depending on the workload, we can optimize these to use a minimal amount of resources when running our application. Lets take a look at a picture that at a high level sums up our executor.
When we request an executor of 10gb to YARN will allocate a JVM on a worker node that looks like something to the left. So the first thing that sticks out is that we 10gb JVM, but Spark has a "safety" region. This is to help prevent OOMs. This number is 90% of total executor memory and is controlled internally and not exposed to the developers. The configuration is named: spark.storage.safetyFraction. So our 10gb executor has 9gb of usable space.
The next thing we see is two distinct boxes: shuffle and storage. These are percentages of the total safety memory. They are controlled by two configs: spark.storage.memoryFraction and spark.shuffle.memoryFraction which are by default 60% and 20%. So with a 10gb executor, we have 90%*60% or 5.4gb for "storage." That means each 10gb executor has 5.4 gb set aside for caching data. If the data doesn't fit entirely into memory, depending on the storage level, it may spill to disk or only cache what it can, this will be discussed later on.
There is another internal config named spark.shuffle.safetyFraction which is by default 80% or 80% of the JVM heap. This is to help avoid OOM. The area labeled "shuffle" is used for pre-reduce aggregations. An example of using this area is when a reduceByKey happens, the reduce happens locally first and the shuffle memory is used for storing the results. If the results of the local reduce by key don't fit into this area, spark will start spilling to disk. In this example, we have 80%*20% or 1.6gb per executor for pre-reduce aggregations.
Now that we know what our memory is allocated for, before we can talk about sizing the executors we need to talk a little bit about caching data. In spark, when the developer calls .persist or .cache on an rdd, the next action will cause the data to be persisted or cached in memory. rdd.cache() is the equivalent of rdd.persist(storageLevel.MEMORY_ONLY). There are several storage levels to choose from, and this will decide what we do if the rdd doesn't fit into memory entirely.
When choosing different storage levels, they have different implications. When working with Spark Scala, and we decide to persist an rdd, we have several choices of how to cache our data.
**NOTE, the default serializer in Spark is the java serializer. Its not great. I would highly suggest using the Kryo Serializer for all spark applications.**
There are several more, but the above three can fit most use cases. OFF_HEAP storage is becoming popular, and is looking like it will be the default in Spark 2.0, but as of 1.5 it is still experimental.
The only way to tell how much memory your cached data is taking up is in the Storage tab in the Spark UI. The Spark UI will be very important to us when trying to accurately size executors.
Now onto the fun part of the article, sizing executors. Unfortunately there is no blanket statement that covers all workloads. Let's start by some general rules of thumbs
The rules of thumb are good for executor sizing, but the actual work load should drive the resource requesting. There are several types of workloads, and lets break them down to ETL, Data Analysis with caching, and Machine Learning.
spark-sumbit --class MyClass --executor-memory 4g --executor-cores 2 --num-executors 10 --conf spark.storage.memoryFraction=0 --conf spark.shuffle.memoryFraction=1 /path/to/app.jar
spark-sumbit --class MyClass --executor-memory 40g --executor-cores 4 --num-executors 2 /path/to/app.jar
spark-sumbit --class MyClass --executor-memory 40g --executor-cores 4 --num-executors 2 --conf spark.driver.memory=2g --conf spark.driver.cores=2 /path/to/app.jar
Now all of this is general starting points. The most important part is after we start our job, is to go to the Spark UI and monitor memory usage. The storage tab will be instrumental when trying to determine the amount of resources your application is using.
To show an example, I'm going to start a spark-shell using the following command:
spark-shell --master yarn-client --executor-memory 1g --num-executors 2
The first thing we notice, is that each executor has Storage Memory of 530mb, even though I requested 1gb. If we do the math 1gb * .9 (safety) * .6 (storage) we get 540mb, which is pretty close to 530mb. The storage memory and shuffle write are the two things we need to monitor the closest when first tuning application. The shuffle write corresponds to amount of data that was spilled to disk prior to a shuffle operation. The storage memory is the amount of memory being used/available on each executor for caching. These two columns should help us decide if we have too much executor or too little. If we have larges amount of shuffle write, we should probably increase the spark.shuffle.memoryFraction, to reduce the intermediate writes to disk. If we're using all of our cache, we might want to increase the size of the executor.
This should give you a good glimpse into memory management with spark, and how to initially size executors and further calibration. One last thing to note
spark.storage.memoryFraction is 0.6 default which corresponds to Old Gen, never make this number larger unless you increase the size of old gen.