Created on 01-27-2017 09:32 PM
Ensure yarn container size is optimized for spark. No more than 1-2 containers per CPU core.
Minimize the use of <operation>ByKey (other than reduceByKey, which is recommended). These functions have a lot of overhead shuffling data via the internal hashtables. While they are useful, they should be utilized sparingly to get the best performance.
Don't always rely on the default rdd partitioning. It will generally always split things into 100 pieces unless otherwise defined. Redefining that default in a way that makes sense for dataset sizes and number of containers will get better performance. If there isn't a fixed number that works across the board, manually repartitioning early in jobs can help to speed things up.
Use the G1 garbage collector instead of the parallel garbage collector. It takes a little fine tuning with memory, but it performs better.
Use dataframes where possible. As progressive versions of spark roll out, there is a lot of performance optimization of dataframes operations that you won't benefit from with just rdd operations.
If streaming jobs are being used, make sure to take advantage of checkpointing. It adds an extra layer of reliability, as spark streaming is a really just microbatching.
Use kryoserilizer, snappy compression, and enable tungsten.
Beyond these, memory tuning for performance is also somewhat tied to dataset size and complexity of operations. A large number of smaller containers is usually recommended, but there are some exceptions, i.e. a low operation count that relies on the underlying hash and a large dataset that needs to be persisted. This is an edge case, though.
That being said, the first thing to be mindful of, especially for yarn container configuration, is the total amount of memory allocated to yarn really shouldn’t exceed more than 75% of the system total. That percentage may need to be even lower depending on the process tenancy of the nodes. The more work a node is doing, the more other process need discrete memory, and the less yarn has to safely play with. On top of that, each container with a spark process is going to have its own v-core assigned to it. If the number of containers per node greatly exceeds the number of cores on the box, proper parallelization of workload is going to break down, as tasks are going to have to share physical cores too heavily and it will impact the performance of jobs, even potentially causing failures due to timeouts or other infrastructure-related bottlenecks. I try to keep the number of containers per node about equal to the number of physical cores,, though you can pretty safely double that number without too much of a performance hit. Any more than that and it’s getting into a grey area.
Also, the sum of memory used per node (memory per container * containers per node) should not be equal to the total yarn memory allocation. It should actually be less. There is additional system overhead for spark jobs. This holds things such as broadcast variables, the hash table that tracks distributed data locations, job lineage, etc. Make sure you aren’t capping the amount of memory per node that yarn has available, because it will cause job failures, much like how you can get mapreduce failures if your yarn containers don’t have enough memory for things like garbage collection.
Finally, something very important to be mindful of, is the workflow itself. There are only a handful of spark commands that actually execute work. The bulk of spark commands are only built into a workflow lineage. If too many high-complexity or resource intensive steps are being taken between execution, it puts a lot of strain on the system and the resources available Definitely take a look at what is being done in a job with an eye to the points when commands are actually executed. If there is a lot of wide transformations going on, there may be ways to streamline the workload to either decrease the number of said transformations or else make them more efficient (narrow transformations to reduce the dataset size when the wide transformations take place).
Outside of the larger tuning, and more just basic best practices, only persist datasets when necessary. This will hold the entire dataset in memory, which, while speeding up work, will also chew up available RAM for complex operations. When a dataset no longer needs to be in memory remember to unpersist it. At the end of a job, make sure to unpersist any sets that were persisted to make sure there are no artifacts left over. Also, two commands that can be very useful are coalesce() and repartition(). The former can decrease the number of partitions in a dataset. If you have a small dataset/RDD, you don’t necessarily needs it in 100+ partitions. Enrichment sets are a good example of this. For the latter, you can increase the number of partitions to more greatly distribute your data if there are going to be a lot of parallel operations happening.