Support Questions

Find answers, ask questions, and share your expertise

How to reduce Spark shuffling caused by join with data coming from Hive

avatar
Explorer

Hello,

I am loading data from Hive table with Spark and make several transformations including a join between two datasets.

This join is causing a large volume of data shuffling (read) making this operation is quite slow.

To avoid this such shuffling, I imagine that data in Hive should be splitted accross nodes according the fields used for join.

But how to do it in practice?

Using Hive bucketing ?

Thank you in advance for your suggestions.

1 ACCEPTED SOLUTION

avatar
Contributor

Hi Jean,

Can you please try the following and let us know if the query performance improved ?

1. set up the shuffle partitions to a higher number than 200, because 200 is default value for shuffle partitions. ( spark.sql.shuffle.partitions=500 or 1000)

2. while loading hive ORC table into dataframes, use the "CLUSTER BY" clause with the join key. Something like,

df1 = sqlContext.sql("SELECT * FROM TABLE1 CLSUTER BY JOINKEY1")

df2 = sqlContext.sql("SELECT * FROM TABLE2 CLUSTER BY JOINKEY2")

df1.registerTempTable("df1_tbl")

df2.registerTempTable("df2_tbl")

Now join df1_tbl & df2_tbl using joinkey1 & joinkey2.

View solution in original post

7 REPLIES 7

avatar
Super Collaborator

Hi @Jean-Sebastien Gourdet,

There are couple of options available to reduce the shuffle (not eliminate in some cases)

  • Using the broadcast variables

By using the broad cast variable, you can eliminate the shuffle of a big table, however you must broadcast the small data across all the executors

This may not be feasible all the cases, if both tables are big.

  • The other alternative (good practice to implement) is to implement the predicated pushdown for Hive data, this filters only the data which is required for the computation at the Hive Level and extract small amount of data.

This may not avoid complete shuffle but certainly speed up the shuffle as the amount of the data which pulled to memory will reduce significantly ( in some cases)

sqlContext.setConf("spark.sql.orc.filterPushdown",
"true")  -- If
you are using ORC files / spark.sql.parquet.filterPushdown in case of Parquet
files.
  • Last but not recommended approach is to extract form single partition by keeping the option .repartitin(1) to the DataFrame you will be avoided the shuffle but all the data will not count on parallelism as the single executor participate on the operation.

On the other note, the shuffle will be quick if the data is evenly distributed (key being used to join the table).

avatar
Explorer

Thanks for feedback.

For broadcast variables, it is not so much applicable in my case as I have big tables.

Concerning filterpushdown, it has not brought results, on the contrary, execution time took longer.

avatar
Contributor

Hi Jean,

Can you please try the following and let us know if the query performance improved ?

1. set up the shuffle partitions to a higher number than 200, because 200 is default value for shuffle partitions. ( spark.sql.shuffle.partitions=500 or 1000)

2. while loading hive ORC table into dataframes, use the "CLUSTER BY" clause with the join key. Something like,

df1 = sqlContext.sql("SELECT * FROM TABLE1 CLSUTER BY JOINKEY1")

df2 = sqlContext.sql("SELECT * FROM TABLE2 CLUSTER BY JOINKEY2")

df1.registerTempTable("df1_tbl")

df2.registerTempTable("df2_tbl")

Now join df1_tbl & df2_tbl using joinkey1 & joinkey2.

avatar
Explorer

Thank you for feedback.

1. Increasing shuffle.partitions led to error :

Total size of serialized results of 153680 tasks (1024.0 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)

2. Using CLUSTER BY in the select reduced data shuffling from 250 GB to 1 GB and execution time was reduced from 13min to 5min. So it is a good gain.

However, I was expecting that I could persist this bucketing to have a minimum shuffling, but it seems that it is not possible, Hive and Spark are not really compatible on this topic.

avatar
Contributor

You can persist the data with partitioning by using the partitionBy(colName) while writing the data frame to a file. The next time you use the dataframe, it wont cause shuffles.

There is a JIRA for the issue you mentioned, which is fixed in 2.2. You can still workaround by increasing driver.maxResult size.

  1. SPARK-12837

avatar
New Contributor

how will i avoid shuffle if i have to join both the data frames on 2 join keys,

df1 = sqlContext.sql("SELECT * FROM TABLE1 CLSUTER BY JOINKEY1,JOINKEY2")

df2 = sqlContext.sql("SELECT * FROM TABLE2 CLUSTER BY JOINKEY1,JOINKEY2")

df3 = sqlContext.sql("SELECT * FROM TABLE3 CLUSTER BY JOINKEY1,JOINKEY2")

df4=df1.join(df2, df1.JOINKEY1=df2.JOINJEY1 and df1.JOINKEY2=df2.JOINKEY2, "inner")

df5 =df1.except(df4).union(df3)

avatar
New Contributor

Wont it results into Shuffle Spill without proper memory configuration in Spark Context?