Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Tuning parallelism: increase or decrease?

avatar
Contributor
I am processing ~2 TB of hdfs data using DataFrames. The size of a task is equal to the block size specified by hdfs, which happens to be 128 MB, leading to about 15000 tasks.I'm using 5 worker nodes with 16 cores each and ~25 GB RAM.

I'm performing groupBy, count, and an outer-join with another DataFrame of ~200 MB size (~80 MB cached but I don't need to cache it), then saving to disk. Right now it takes about 55 minutes, and I've been trying to tune it.

I read on the Spark Tuning guide that: In general, we recommend 2-3 tasks per CPU core in your cluster. This means that I should have about 30-50 tasks instead of 15000, and each task would be much bigger in size. Is my understanding correct, and is this suggested? I've read from difference sources to decrease or increase parallelism (by spark.default.parallelism or changing the block size), or even keep it default.

Thank you for your help!

16 REPLIES 16

avatar
Contributor

I used explain() and here is what I found.

I found that without explicitly calling functions.broadcast(), there was no mention of broadcasting.

When I called broadcast(), there was a broadcast hint in the logical plans but the final physical plan did not mention broadcasting.

However, I do not see anything like a Broadcast RDD or an RDD (is this because I'm using DataFrames?)

avatar
Super Collaborator

Can you share the explain plan?

avatar
Contributor

In order for the broadcast join to work, you need to run ANALYZE TABLE [tablename] COMPUTE STATISTICS noscan. Otherwise, it won't know the size to do the broadcast map join. Also, you mention you are trying to do a groupBy. You need to use a reduceBy instead. That will dramatically increase the performance. Spark will do the count in the map, then distribute the results to the reducer (if you want to think about it in map-reduce terms). Switching to reduceBy alone should solve your performance issue.

avatar
Super Collaborator

Agreed 100%. If you can accomplish the same task using reduceByKey, it implements a combiner, so its basically does the aggregate locally, then shuffles the results for each partition. Just keep an eye on GC when doing this.

avatar
Contributor

I'm doing a groupBy using DataFrames. I considered converting to RDD and doing reduceByKey, then converting back to DataFrames, but using DataFrames offers under-the-hood optimizations so I don't need to worry about the benefit of local aggregation.

avatar
Super Collaborator

If you call groupByKey on a dataframe it implicitly converts the dataframe to an rdd. You lose all benefits of the optimizer for this step.

avatar
Contributor

I should have read the post a little closer I thought you were doing a groupByKey. You are correct, you need to use groupBy to keep the execution within the dataframe and out of Python. However, you said you are doing an outer join. If it is a left join and the right side is larger than the left, then do an inner join first. Then do your left join on the result. Your result most likely will be broadcasted to do the left join. This is a pattern that Holden described at Strata this year in one of her sessions.