Created 07-30-2016 06:57 AM
I'm performing groupBy, count, and an outer-join with another DataFrame of ~200 MB size (~80 MB cached but I don't need to cache it), then saving to disk. Right now it takes about 55 minutes, and I've been trying to tune it.
I read on the Spark Tuning guide that: In general, we recommend 2-3 tasks per CPU core in your cluster. This means that I should have about 30-50 tasks instead of 15000, and each task would be much bigger in size. Is my understanding correct, and is this suggested? I've read from difference sources to decrease or increase parallelism (by spark.default.parallelism or changing the block size), or even keep it default.
Thank you for your help!
Created 07-31-2016 11:14 PM
I used explain() and here is what I found.
I found that without explicitly calling functions.broadcast(), there was no mention of broadcasting.
When I called broadcast(), there was a broadcast hint in the logical plans but the final physical plan did not mention broadcasting.
However, I do not see anything like a Broadcast RDD or an RDD (is this because I'm using DataFrames?)
Created 08-01-2016 02:09 AM
Can you share the explain plan?
Created 07-31-2016 09:28 PM
In order for the broadcast join to work, you need to run ANALYZE TABLE [tablename] COMPUTE STATISTICS noscan. Otherwise, it won't know the size to do the broadcast map join. Also, you mention you are trying to do a groupBy. You need to use a reduceBy instead. That will dramatically increase the performance. Spark will do the count in the map, then distribute the results to the reducer (if you want to think about it in map-reduce terms). Switching to reduceBy alone should solve your performance issue.
Created 07-31-2016 09:29 PM
Agreed 100%. If you can accomplish the same task using reduceByKey, it implements a combiner, so its basically does the aggregate locally, then shuffles the results for each partition. Just keep an eye on GC when doing this.
Created 07-31-2016 10:25 PM
I'm doing a groupBy using DataFrames. I considered converting to RDD and doing reduceByKey, then converting back to DataFrames, but using DataFrames offers under-the-hood optimizations so I don't need to worry about the benefit of local aggregation.
Created 08-01-2016 02:08 AM
If you call groupByKey on a dataframe it implicitly converts the dataframe to an rdd. You lose all benefits of the optimizer for this step.
Created 08-01-2016 06:23 PM
I should have read the post a little closer I thought you were doing a groupByKey. You are correct, you need to use groupBy to keep the execution within the dataframe and out of Python. However, you said you are doing an outer join. If it is a left join and the right side is larger than the left, then do an inner join first. Then do your left join on the result. Your result most likely will be broadcasted to do the left join. This is a pattern that Holden described at Strata this year in one of her sessions.