I'm performing groupBy, count, and an outer-join with another DataFrame of ~200 MB size (~80 MB cached but I don't need to cache it), then saving to disk. Right now it takes about 55 minutes, and I've been trying to tune it.
I read on the Spark Tuning guide that: In general, we recommend 2-3 tasks per CPU core in your cluster. This means that I should have about 30-50 tasks instead of 15000, and each task would be much bigger in size. Is my understanding correct, and is this suggested? I've read from difference sources to decrease or increase parallelism (by spark.default.parallelism or changing the block size), or even keep it default.
Thank you for your help!
Is it possible to increase the block size of data? I know you already have data which means you will have to do some extra work. But if you can increase block size, your number of tasks will go down and it wouldn't hurt because more of your data will be sitting together. A simple 256 MB block will reduce your number of tasks to 7500. May be increase it further and see the benefits? Find your sweet spot with the block size first and then look at other options. I don't think you should go beyond 512 MB but you have to find out. What is the data format?
My data format is 2 csv files, one is 1.7 TB and one is about 200 MB.
Update: Tried using a block size of 64 MB and there was no difference again.
I tried using a block size of 256MB and there was no difference in time (see screenshot).
The block size is seen as 256 MB.
The task deserialization and GC time remains the same as before increasing block size.
The relative input size for each executor is the same (the largest input size and the smallest input size are about 12-15 GB apart). Is this normal or is this considered data skew?
Thank you for replying.
Again see below the block size doesn't really change anything apart from the number of total tasks. You still have the same number of tasks running at the same time which are defined by the executor-cores. Can you do a top on the machines while the job is runnoing? If your CPU utilization is low you want to increase the executor-cores if they are close to max. reducing it a bit might be working ( to reduce task switching ) . However if that doesn't change anything. you might just have to live with the prformance. Or buy more nodes.
You do not need to reduce the number of tasks to 2x cores you need to reduce the number of tasks that run AT THE SAME TIME to 2-3 per core ( so 5 * 16 * 2 = 160 ) . You also don't need to change block size or anything.
Also Executors work best with 10-50GB of RAM so 24GB executors or so seem fine. You can then set the number of tasks running with the --executor-cores y flag. This means that an executor can run y tasks at the same time. In your case 16 * 2-3 might be a good value. So the 15000 tasks will be executed one after another. You can tune these parameters accordingly. ( You can also try two smaller executors per node since Garbage Collection for long running tasks is an actual concern but as said 24GB is not too much. )
Hi Benjamin, thanks for replying.
I'm confused as to what you said about 16*2-3 being a good value for executors. From what I know, executor-cores should be set to be at most the number of cores in a single node (so executor-cores < 16 here). Or do you mean setting the executor-cores value to be 2-3?
The number of cores are the number of tasks "slots" in the executor. This is the reference to "you want 2-3x of physical cores". You want to make sure that at the same time spark runs more tasks than cores of the cpu ( there is hyper threading and some overhead). So assuming you have 15000 tasks and 100 executor cores in total spark will run them in 150 "waves". Think of it like a yarn in yarn.
Now you also have vcores in yarn and the executor cores are translated into vcore requests but normally they are not switched on and purely ornamental. I.e. Yarn only uses memory for assignment )
Try setting your join to a broadcast join. By default, the Spark SQL does a broadcast join for tables less than 10mb. I think in this case, it would make a lot of sense to changing the setting "spark.sql.autoBroadCastJoinThreshold" to 250mb. This will do a map side join in terms of mapreduce, and should be much quicker than what you're experiencing.
Also, don't worry about having a large amount of tasks. Its very ok to have that many tasks. I've found that any more than 4 cores per executor, you have diminishing returns on performance (IE 4 cores = 400% throughput, 5 cores is ~430% throughput).
Another setting you might want to investigate is the spark.sql.shuffle.partitions setting, it is the number of partitions to use when shuffling data for joins, and by default is 200. I think you might want to up that number quite a bit.
Hi Joe, thanks for the suggestions. I have broadcast join threshold set to 300 MB (but my spark SQL tab in the UI doesn't say broadcast join anywhere. Is this because the resulting physical plan finds a more optimal join than broadcast?)
As for shuffle partitions, I currently have it as 611 because on the UI I see that the shuffle size for read/write is about 7.8 GB, and so I make each partition ~ 128 MB => 7800 mb / 128 mb = 611 partitions. Will update with results.
Use the explain API, you should see something like BroadcastRDD if a broadcast join is happening. Also, make sure you've enabled code generation for spark sql "spark.sql.codegen=true" . Older versions of spark (1.4 and earlier) have it set to false.
I used explain() and here is what I found.
I found that without explicitly calling functions.broadcast(), there was no mention of broadcasting.
When I called broadcast(), there was a broadcast hint in the logical plans but the final physical plan did not mention broadcasting.
However, I do not see anything like a Broadcast RDD or an RDD (is this because I'm using DataFrames?)
In order for the broadcast join to work, you need to run ANALYZE TABLE [tablename] COMPUTE STATISTICS noscan. Otherwise, it won't know the size to do the broadcast map join. Also, you mention you are trying to do a groupBy. You need to use a reduceBy instead. That will dramatically increase the performance. Spark will do the count in the map, then distribute the results to the reducer (if you want to think about it in map-reduce terms). Switching to reduceBy alone should solve your performance issue.
Agreed 100%. If you can accomplish the same task using reduceByKey, it implements a combiner, so its basically does the aggregate locally, then shuffles the results for each partition. Just keep an eye on GC when doing this.
I'm doing a groupBy using DataFrames. I considered converting to RDD and doing reduceByKey, then converting back to DataFrames, but using DataFrames offers under-the-hood optimizations so I don't need to worry about the benefit of local aggregation.
I should have read the post a little closer I thought you were doing a groupByKey. You are correct, you need to use groupBy to keep the execution within the dataframe and out of Python. However, you said you are doing an outer join. If it is a left join and the right side is larger than the left, then do an inner join first. Then do your left join on the result. Your result most likely will be broadcasted to do the left join. This is a pattern that Holden described at Strata this year in one of her sessions.