Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

How to tune Spark for parallel processing when loading small data files

Solved Go to solution
Highlighted

How to tune Spark for parallel processing when loading small data files

The issue is that the input data files to Spark are very small, about 6 MB (<100000 records). However, the required processing/calculations are heavy, which would benefit from running in multiple executors. Currently, all processing is running on a single executor even when specifying multiple executors to spark-submit.

1 ACCEPTED SOLUTION

Accepted Solutions
Highlighted

Re: How to tune Spark for parallel processing when loading small data files

First, you should try to take advantage if your data is stored in splittable formats (snappy, LZO, bzip2, etc). If so, then instruct Spark to split the data into multiple partitions upon read. In Scala, you can do this:

file = sc.textFile(Path, numPartitions)

You will also need to tune your YARN container sizes to work with your executor allocation. Make sure your Max Yarn Mem Alloc ('yarn.scheduler.maximum-allocation-mb') is bigger than what you are asking for per executor (this will include the default overhead of 384 MB).

The following parameters are used to allocate Spark executors and driver memory:

spark.executor.instances -- number of spark executors
spark.executor.memory -- memory per spark executors (plus 384 MB overhead)
spark.driver.memory -- memory per spark driver

6MB file is pretty small, much smaller than HDFS block size, so you are probably getting a single partition until you do something to repartition it. You can also set numPartitions parameter like this:

I would probably call one of these repartition methods on your DataFrame:

def repartition(numPartitions: Int, partitionExprs: Column*): DataFrame
Returns a new DataFrame partitioned by the given partitioning expressions into numPartitions. The resulting DataFrame is hash partitioned.

OR this:

def repartition(numPartitions: Int): DataFrame
Returns a new DataFrame that has exactly numPartitions partitions.

View solution in original post

1 REPLY 1
Highlighted

Re: How to tune Spark for parallel processing when loading small data files

First, you should try to take advantage if your data is stored in splittable formats (snappy, LZO, bzip2, etc). If so, then instruct Spark to split the data into multiple partitions upon read. In Scala, you can do this:

file = sc.textFile(Path, numPartitions)

You will also need to tune your YARN container sizes to work with your executor allocation. Make sure your Max Yarn Mem Alloc ('yarn.scheduler.maximum-allocation-mb') is bigger than what you are asking for per executor (this will include the default overhead of 384 MB).

The following parameters are used to allocate Spark executors and driver memory:

spark.executor.instances -- number of spark executors
spark.executor.memory -- memory per spark executors (plus 384 MB overhead)
spark.driver.memory -- memory per spark driver

6MB file is pretty small, much smaller than HDFS block size, so you are probably getting a single partition until you do something to repartition it. You can also set numPartitions parameter like this:

I would probably call one of these repartition methods on your DataFrame:

def repartition(numPartitions: Int, partitionExprs: Column*): DataFrame
Returns a new DataFrame partitioned by the given partitioning expressions into numPartitions. The resulting DataFrame is hash partitioned.

OR this:

def repartition(numPartitions: Int): DataFrame
Returns a new DataFrame that has exactly numPartitions partitions.

View solution in original post

Don't have an account?
Coming from Hortonworks? Activate your account here