Community Articles

Find and share helpful community-sourced technical articles.
Labels (2)
avatar
Super Collaborator

Spark is notoriously knobby when it comes to tuning applications and requesting resources. While this added freedom gives us an incredible amount of control and flexibility when running our applications, it also gives us an opportunity to make a lot of mistakes. This guide is for Spark 1.5 and earlier. Spark 1.6 introduces a new setting that allows for a less rigid mapping of the memory areas.

There are a lot of knobs in spark to tune, knowing which ones do what will allow you to maximize your memory usage, and understand why you’re doing what you're doing.

Most people think when requesting 10g for a worker with Spark, your application has 10g of memory for caching, but its actually quite more complex than that. A Spark executor has several memory areas allocated, and knowing what these areas are for will help understand the best allocation for resources.

An executor has two regions, room for caching, and room for aggregations pre-shuffle. Depending on the workload, we can optimize these to use a minimal amount of resources when running our application. Lets take a look at a picture that at a high level sums up our executor.

5387-screen-shot-2016-06-30-at-54332-pm.png

When we request an executor of 10gb to YARN will allocate a JVM on a worker node that looks like something to the left. So the first thing that sticks out is that we 10gb JVM, but Spark has a "safety" region. This is to help prevent OOMs. This number is 90% of total executor memory and is controlled internally and not exposed to the developers. The configuration is named: spark.storage.safetyFraction. So our 10gb executor has 9gb of usable space.

The next thing we see is two distinct boxes: shuffle and storage. These are percentages of the total safety memory. They are controlled by two configs: spark.storage.memoryFraction and spark.shuffle.memoryFraction which are by default 60% and 20%. So with a 10gb executor, we have 90%*60% or 5.4gb for "storage." That means each 10gb executor has 5.4 gb set aside for caching data. If the data doesn't fit entirely into memory, depending on the storage level, it may spill to disk or only cache what it can, this will be discussed later on.

There is another internal config named spark.shuffle.safetyFraction which is by default 80% or 80% of the JVM heap. This is to help avoid OOM. The area labeled "shuffle" is used for pre-reduce aggregations. An example of using this area is when a reduceByKey happens, the reduce happens locally first and the shuffle memory is used for storing the results. If the results of the local reduce by key don't fit into this area, spark will start spilling to disk. In this example, we have 80%*20% or 1.6gb per executor for pre-reduce aggregations.

Now that we know what our memory is allocated for, before we can talk about sizing the executors we need to talk a little bit about caching data. In spark, when the developer calls .persist or .cache on an rdd, the next action will cause the data to be persisted or cached in memory. rdd.cache() is the equivalent of rdd.persist(storageLevel.MEMORY_ONLY). There are several storage levels to choose from, and this will decide what we do if the rdd doesn't fit into memory entirely.

When choosing different storage levels, they have different implications. When working with Spark Scala, and we decide to persist an rdd, we have several choices of how to cache our data.

  • MEMORY_ONLY -> This stores our data as java objects. Its not very space efficient as the size of the java object for an int or sting has a lot of extra packing, but this is the fastest. If your data can be completely cached with MEMORY_ONLY, choose that and move on.
  • MEMORY_ONLY_SER -> This stores data in a serialized format. This allows us to fit substantially more data into memory, but there is a small expense in the fact that data must be serialized/deserialized when used, so this is a bit more CPU intensive. This is a typically a very good choice for caching data
  • MEMORY_AND_DISK_SER -> This stores data in a serialized format. This has the same advantages as the previous one, but anything that doesn't fit in memory will be spilled to local disk in a serialized format. This is a good choice if there has been heavy processing, or any joins/reduces prior to caching.

    **NOTE, the default serializer in Spark is the java serializer. Its not great. I would highly suggest using the Kryo Serializer for all spark applications.**

There are several more, but the above three can fit most use cases. OFF_HEAP storage is becoming popular, and is looking like it will be the default in Spark 2.0, but as of 1.5 it is still experimental.

The only way to tell how much memory your cached data is taking up is in the Storage tab in the Spark UI. The Spark UI will be very important to us when trying to accurately size executors.

Now onto the fun part of the article, sizing executors. Unfortunately there is no blanket statement that covers all workloads. Let's start by some general rules of thumbs

  1. The JVM heap should be no larger than 64gb. 40 gb seems to be the sweet spot for max size. This is due to garbage collection. If the executor gets too big, GC will kill your application performance. New GC methods are available, like G1C1, but this guide is assuming we're using the legacy method.
  2. The executor should use no more than 4 cores. Through trial and error, it seems we get full return on investment with 4 cores, so 4 cores is the equivalent of to 400% more throughput if 1 core was the equivalent of 100% throughput. At 5 cores, the return on investment seems to be about 430-450%, so it doesn't make sense to add that extra core.
  3. Fewer large executors are generally better than many small executors. The more executors we have, the more we need to pass data around the network. There is one caveat to this, it is sometimes hard to get 4 cores and 40gb of resources from YARN on a single node, so if the cluster is highly utilized, it may take longer for the job to start.

The rules of thumb are good for executor sizing, but the actual work load should drive the resource requesting. There are several types of workloads, and lets break them down to ETL, Data Analysis with caching, and Machine Learning.

  • ETL. In ETL there are typically limited reduces and many jobs are map only jobs. Because of this, it is actually much better to request many smaller executors to help with data locality. In addition, in ETL caching data doesn't happen very often so we need to remove the storage aspect of the executors entirely. A good starting point for requesting resources on ETL is to request many 4gb executors with 2 cores, and set the spark.storage.memoryFraction to 0 and the spark.shuffle.memoryFraction to 1. This will allow us to leverage the executor to do only what we need, and thats space for joins and reduces. In addition, we care about the total size of the data set involved. A good starting point is 1gb of ram per gb of data on disk. A common spark-submit string may look something like the below for a 40gb dataset
spark-sumbit --class MyClass --executor-memory 4g --executor-cores 2 --num-executors 10 --conf spark.storage.memoryFraction=0 --conf spark.shuffle.memoryFraction=1 /path/to/app.jar
  • Data Analysis with Caching. In data analysis, the same datasets are often queried several times. In this case, we want to take advantage of caching and limiting HDFS reads. Data analysis with caching leans more on our rules of thumb. A good starting point for number of executors would be 2gb ram per gb of dataset. A sample spark-submit string may look like the below with a 40gb dataset
spark-sumbit --class MyClass --executor-memory 40g --executor-cores 4 --num-executors 2 /path/to/app.jar
  • Machine Learning can actually leverage the same idea as data analysis with caching with one small exception. When building models, the application driver has more pressure as it does some work when building the model. Adding more memory/cores to the driver will help the application run smoother. Also, the driver memory is only required if you're receiving an OOM error. A sample spark-submit string may look like the below for a 40gb dataset
spark-sumbit --class MyClass --executor-memory 40g --executor-cores 4 --num-executors 2 --conf spark.driver.memory=2g --conf spark.driver.cores=2 /path/to/app.jar

Now all of this is general starting points. The most important part is after we start our job, is to go to the Spark UI and monitor memory usage. The storage tab will be instrumental when trying to determine the amount of resources your application is using.

To show an example, I'm going to start a spark-shell using the following command:

spark-shell --master yarn-client --executor-memory 1g --num-executors 2

5390-screen-shot-2016-06-30-at-94820-pm.png

The first thing we notice, is that each executor has Storage Memory of 530mb, even though I requested 1gb. If we do the math 1gb * .9 (safety) * .6 (storage) we get 540mb, which is pretty close to 530mb. The storage memory and shuffle write are the two things we need to monitor the closest when first tuning application. The shuffle write corresponds to amount of data that was spilled to disk prior to a shuffle operation. The storage memory is the amount of memory being used/available on each executor for caching. These two columns should help us decide if we have too much executor or too little. If we have larges amount of shuffle write, we should probably increase the spark.shuffle.memoryFraction, to reduce the intermediate writes to disk. If we're using all of our cache, we might want to increase the size of the executor.

This should give you a good glimpse into memory management with spark, and how to initially size executors and further calibration. One last thing to note

spark.storage.memoryFraction is 0.6 default which corresponds to Old Gen, never make this number larger unless you increase the size of old gen.

38,066 Views
Comments
avatar
New Contributor

My best practice:- Keep the number of executors equal to the number of spark clients that your cluster is configured for. Go with 2 cores per executor. You will fetch optimum performance with consistent distribution of batches for processing as confirmed from Spark UI.