Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Tuning parallelism: increase or decrease?

avatar
Contributor
I am processing ~2 TB of hdfs data using DataFrames. The size of a task is equal to the block size specified by hdfs, which happens to be 128 MB, leading to about 15000 tasks.I'm using 5 worker nodes with 16 cores each and ~25 GB RAM.

I'm performing groupBy, count, and an outer-join with another DataFrame of ~200 MB size (~80 MB cached but I don't need to cache it), then saving to disk. Right now it takes about 55 minutes, and I've been trying to tune it.

I read on the Spark Tuning guide that: In general, we recommend 2-3 tasks per CPU core in your cluster. This means that I should have about 30-50 tasks instead of 15000, and each task would be much bigger in size. Is my understanding correct, and is this suggested? I've read from difference sources to decrease or increase parallelism (by spark.default.parallelism or changing the block size), or even keep it default.

Thank you for your help!

16 REPLIES 16

avatar
Super Guru

@jestin ma

Is it possible to increase the block size of data? I know you already have data which means you will have to do some extra work. But if you can increase block size, your number of tasks will go down and it wouldn't hurt because more of your data will be sitting together. A simple 256 MB block will reduce your number of tasks to 7500. May be increase it further and see the benefits? Find your sweet spot with the block size first and then look at other options. I don't think you should go beyond 512 MB but you have to find out. What is the data format?

avatar
Contributor

My data format is 2 csv files, one is 1.7 TB and one is about 200 MB.

Update: Tried using a block size of 64 MB and there was no difference again.

I tried using a block size of 256MB and there was no difference in time (see screenshot).

6199-screen-shot-2016-07-30-at-95644-pm.png

The block size is seen as 256 MB.

The task deserialization and GC time remains the same as before increasing block size.

The relative input size for each executor is the same (the largest input size and the smallest input size are about 12-15 GB apart). Is this normal or is this considered data skew?

Thank you for replying.

avatar
Master Guru

Again see below the block size doesn't really change anything apart from the number of total tasks. You still have the same number of tasks running at the same time which are defined by the executor-cores. Can you do a top on the machines while the job is runnoing? If your CPU utilization is low you want to increase the executor-cores if they are close to max. reducing it a bit might be working ( to reduce task switching ) . However if that doesn't change anything. you might just have to live with the prformance. Or buy more nodes.

avatar
Master Guru

You do not need to reduce the number of tasks to 2x cores you need to reduce the number of tasks that run AT THE SAME TIME to 2-3 per core ( so 5 * 16 * 2 = 160 ) . You also don't need to change block size or anything.

Also Executors work best with 10-50GB of RAM so 24GB executors or so seem fine. You can then set the number of tasks running with the --executor-cores y flag. This means that an executor can run y tasks at the same time. In your case 16 * 2-3 might be a good value. So the 15000 tasks will be executed one after another. You can tune these parameters accordingly. ( You can also try two smaller executors per node since Garbage Collection for long running tasks is an actual concern but as said 24GB is not too much. )

http://enterprisesecurityinjava.blogspot.co.uk/2016/04/slides-from-running-spark-in-production.html

avatar
Contributor

Hi Benjamin, thanks for replying.

I'm confused as to what you said about 16*2-3 being a good value for executors. From what I know, executor-cores should be set to be at most the number of cores in a single node (so executor-cores < 16 here). Or do you mean setting the executor-cores value to be 2-3?

avatar
Master Guru

The number of cores are the number of tasks "slots" in the executor. This is the reference to "you want 2-3x of physical cores". You want to make sure that at the same time spark runs more tasks than cores of the cpu ( there is hyper threading and some overhead). So assuming you have 15000 tasks and 100 executor cores in total spark will run them in 150 "waves". Think of it like a yarn in yarn.

Now you also have vcores in yarn and the executor cores are translated into vcore requests but normally they are not switched on and purely ornamental. I.e. Yarn only uses memory for assignment )

avatar
Super Collaborator

Try setting your join to a broadcast join. By default, the Spark SQL does a broadcast join for tables less than 10mb. I think in this case, it would make a lot of sense to changing the setting "spark.sql.autoBroadCastJoinThreshold" to 250mb. This will do a map side join in terms of mapreduce, and should be much quicker than what you're experiencing.

Also, don't worry about having a large amount of tasks. Its very ok to have that many tasks. I've found that any more than 4 cores per executor, you have diminishing returns on performance (IE 4 cores = 400% throughput, 5 cores is ~430% throughput).

Another setting you might want to investigate is the spark.sql.shuffle.partitions setting, it is the number of partitions to use when shuffling data for joins, and by default is 200. I think you might want to up that number quite a bit.

avatar
Contributor

Hi Joe, thanks for the suggestions. I have broadcast join threshold set to 300 MB (but my spark SQL tab in the UI doesn't say broadcast join anywhere. Is this because the resulting physical plan finds a more optimal join than broadcast?)

As for shuffle partitions, I currently have it as 611 because on the UI I see that the shuffle size for read/write is about 7.8 GB, and so I make each partition ~ 128 MB => 7800 mb / 128 mb = 611 partitions. Will update with results.

avatar
Super Collaborator

Use the explain API, you should see something like BroadcastRDD if a broadcast join is happening. Also, make sure you've enabled code generation for spark sql "spark.sql.codegen=true" . Older versions of spark (1.4 and earlier) have it set to false.