Support Questions

Find answers, ask questions, and share your expertise

How to tune Spark for parallel processing when loading small data files

avatar

The issue is that the input data files to Spark are very small, about 6 MB (<100000 records). However, the required processing/calculations are heavy, which would benefit from running in multiple executors. Currently, all processing is running on a single executor even when specifying multiple executors to spark-submit.

1 ACCEPTED SOLUTION

avatar

First, you should try to take advantage if your data is stored in splittable formats (snappy, LZO, bzip2, etc). If so, then instruct Spark to split the data into multiple partitions upon read. In Scala, you can do this:

file = sc.textFile(Path, numPartitions)

You will also need to tune your YARN container sizes to work with your executor allocation. Make sure your Max Yarn Mem Alloc ('yarn.scheduler.maximum-allocation-mb') is bigger than what you are asking for per executor (this will include the default overhead of 384 MB).

The following parameters are used to allocate Spark executors and driver memory:

spark.executor.instances -- number of spark executors
spark.executor.memory -- memory per spark executors (plus 384 MB overhead)
spark.driver.memory -- memory per spark driver

6MB file is pretty small, much smaller than HDFS block size, so you are probably getting a single partition until you do something to repartition it. You can also set numPartitions parameter like this:

I would probably call one of these repartition methods on your DataFrame:

def repartition(numPartitions: Int, partitionExprs: Column*): DataFrame
Returns a new DataFrame partitioned by the given partitioning expressions into numPartitions. The resulting DataFrame is hash partitioned.

OR this:

def repartition(numPartitions: Int): DataFrame
Returns a new DataFrame that has exactly numPartitions partitions.

View solution in original post

1 REPLY 1

avatar

First, you should try to take advantage if your data is stored in splittable formats (snappy, LZO, bzip2, etc). If so, then instruct Spark to split the data into multiple partitions upon read. In Scala, you can do this:

file = sc.textFile(Path, numPartitions)

You will also need to tune your YARN container sizes to work with your executor allocation. Make sure your Max Yarn Mem Alloc ('yarn.scheduler.maximum-allocation-mb') is bigger than what you are asking for per executor (this will include the default overhead of 384 MB).

The following parameters are used to allocate Spark executors and driver memory:

spark.executor.instances -- number of spark executors
spark.executor.memory -- memory per spark executors (plus 384 MB overhead)
spark.driver.memory -- memory per spark driver

6MB file is pretty small, much smaller than HDFS block size, so you are probably getting a single partition until you do something to repartition it. You can also set numPartitions parameter like this:

I would probably call one of these repartition methods on your DataFrame:

def repartition(numPartitions: Int, partitionExprs: Column*): DataFrame
Returns a new DataFrame partitioned by the given partitioning expressions into numPartitions. The resulting DataFrame is hash partitioned.

OR this:

def repartition(numPartitions: Int): DataFrame
Returns a new DataFrame that has exactly numPartitions partitions.