Created 08-09-2016 08:28 PM
The issue is that the input data files to Spark are very small, about 6 MB (<100000 records). However, the required processing/calculations are heavy, which would benefit from running in multiple executors. Currently, all processing is running on a single executor even when specifying multiple executors to spark-submit.
Created 08-09-2016 08:43 PM
First, you should try to take advantage if your data is stored in splittable formats (snappy, LZO, bzip2, etc). If so, then instruct Spark to split the data into multiple partitions upon read. In Scala, you can do this:
file = sc.textFile(Path, numPartitions)
You will also need to tune your YARN container sizes to work with your executor allocation. Make sure your Max Yarn Mem Alloc ('yarn.scheduler.maximum-allocation-mb') is bigger than what you are asking for per executor (this will include the default overhead of 384 MB).
The following parameters are used to allocate Spark executors and driver memory:
spark.executor.instances -- number of spark executors spark.executor.memory -- memory per spark executors (plus 384 MB overhead) spark.driver.memory -- memory per spark driver
6MB file is pretty small, much smaller than HDFS block size, so you are probably getting a single partition until you do something to repartition it. You can also set numPartitions parameter like this:
I would probably call one of these repartition methods on your DataFrame:
def repartition(numPartitions: Int, partitionExprs: Column*): DataFrame Returns a new DataFrame partitioned by the given partitioning expressions into numPartitions. The resulting DataFrame is hash partitioned.
OR this:
def repartition(numPartitions: Int): DataFrame Returns a new DataFrame that has exactly numPartitions partitions.
Created 08-09-2016 08:43 PM
First, you should try to take advantage if your data is stored in splittable formats (snappy, LZO, bzip2, etc). If so, then instruct Spark to split the data into multiple partitions upon read. In Scala, you can do this:
file = sc.textFile(Path, numPartitions)
You will also need to tune your YARN container sizes to work with your executor allocation. Make sure your Max Yarn Mem Alloc ('yarn.scheduler.maximum-allocation-mb') is bigger than what you are asking for per executor (this will include the default overhead of 384 MB).
The following parameters are used to allocate Spark executors and driver memory:
spark.executor.instances -- number of spark executors spark.executor.memory -- memory per spark executors (plus 384 MB overhead) spark.driver.memory -- memory per spark driver
6MB file is pretty small, much smaller than HDFS block size, so you are probably getting a single partition until you do something to repartition it. You can also set numPartitions parameter like this:
I would probably call one of these repartition methods on your DataFrame:
def repartition(numPartitions: Int, partitionExprs: Column*): DataFrame Returns a new DataFrame partitioned by the given partitioning expressions into numPartitions. The resulting DataFrame is hash partitioned.
OR this:
def repartition(numPartitions: Int): DataFrame Returns a new DataFrame that has exactly numPartitions partitions.