Support Questions
Find answers, ask questions, and share your expertise
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

DataFrames/Sets: Simple join is skewed due to join column skew


DataFrames/Sets: Simple join is skewed due to join column skew


Hi, I'm currently trying to perform an outer join between two DataFrames/Sets, one is ~150GB, one is about ~50 GB on a column, id. is skewed in that there are many 0's, the rest being unique IDs. is not skewed. If I filter != 0, then the join works well. If I don't, then the join does not complete for a very, very long time.

I have diagnosed this problem due to the hashpartitioning on IDs, resulting in one partition containing many values due to data skew. One executor ends up reading most of the shuffle data, and writing all of the shuffle data, as shown below.



Shown above is the task in question assigned to one executor.


This screenshot comes from one of the executors, showing one single thread spilling sort data since the executor cannot hold 90%+ of the ~200 GB result in memory.

Moreover, looking at the event timeline, I find that the executor on that task spends about 20% time reading shuffle data, 70% computation, and 10% writing output data.

I have tried the following:

"Salting" the 0-value keys by monotonically_increasing_id().mod(N)

- This doesn't seem to have an effect since now I have hundreds/thousands of keys with tens of thousands of occurrences.

- Should I increase N? Is there a way to just do random.mod(N) instead of monotonically_increasing_id()?

Repartitioning according to column I know contains unique values

- This is overridden by Spark's sort-based shuffle manager which hash repartitions on the skewed column

- Is it possible to change this? Or will the join column need to be hashed and partitioned on for joins to work

Broadcasting does not work for my large tables

Increasing/decreasing spark.sql.shuffle.partitions does not remedy the skewed data problem as 0-product values are still being hashed to the same partition.


What I am considering currently is doing the join at the RDD level, but is there any level of control which can solve my skewed data problem? Other than that, see the bolded question.

I would appreciate any suggestions/tips/experience with this. Thank you!



Re: DataFrames/Sets: Simple join is skewed due to join column skew

Technically the answer would be to use the HashPartitioner and specify the number of partitions. I generally just use df.repartition(numOfColumns, columns), then it will redistribute. numOfColumns is not a required field in 1.6, but it is in 1.5.X. I normally would use it anyway. Coalesce is not going to work for you in this instance.

Here's something to consider though. Are you are joining these large datasets a left join? If so, is the 150GB on the right side? If so, then do an inner join first, then left join on that result. Your 50GB dataset will swap for the inner join, then the result probably won't swap since it is already in the right location for the left join. Even if it did, it's only 50GB (at most), so you still save a lot of memory and network I/O.


Re: DataFrames/Sets: Simple join is skewed due to join column skew


You think filtering on skewed value and splitting skewed dataset into two different dataframes i.e one with all skewed rows and other with remaining rows,(and repartition if necessary) then outer join with second dataset will help especially in performance? We can later join result datasets and get same end result


Re: DataFrames/Sets: Simple join is skewed due to join column skew

New Contributor

hash paritioner will not solve the problemm you need to replicate the keys ,

Don't have an account?
Coming from Hortonworks? Activate your account here