Created 06-12-2017 07:00 AM
Hello,
I am loading data from Hive table with Spark and make several transformations including a join between two datasets.
This join is causing a large volume of data shuffling (read) making this operation is quite slow.
To avoid this such shuffling, I imagine that data in Hive should be splitted accross nodes according the fields used for join.
But how to do it in practice?
Using Hive bucketing ?
Thank you in advance for your suggestions.
Created 06-14-2017 02:04 PM
Hi Jean,
Can you please try the following and let us know if the query performance improved ?
1. set up the shuffle partitions to a higher number than 200, because 200 is default value for shuffle partitions. ( spark.sql.shuffle.partitions=500 or 1000)
2. while loading hive ORC table into dataframes, use the "CLUSTER BY" clause with the join key. Something like,
df1 = sqlContext.sql("SELECT * FROM TABLE1 CLSUTER BY JOINKEY1")
df2 = sqlContext.sql("SELECT * FROM TABLE2 CLUSTER BY JOINKEY2")
df1.registerTempTable("df1_tbl")
df2.registerTempTable("df2_tbl")
Now join df1_tbl & df2_tbl using joinkey1 & joinkey2.
Created 06-14-2017 04:33 AM
There are couple of options available to reduce the shuffle (not eliminate in some cases)
By using the broad cast variable, you can eliminate the shuffle of a big table, however you must broadcast the small data across all the executors
This may not be feasible all the cases, if both tables are big.
This may not avoid complete shuffle but certainly speed up the shuffle as the amount of the data which pulled to memory will reduce significantly ( in some cases)
sqlContext.setConf("spark.sql.orc.filterPushdown", "true") -- If you are using ORC files / spark.sql.parquet.filterPushdown in case of Parquet files.
On the other note, the shuffle will be quick if the data is evenly distributed (key being used to join the table).
Created 06-15-2017 07:27 AM
Thanks for feedback.
For broadcast variables, it is not so much applicable in my case as I have big tables.
Concerning filterpushdown, it has not brought results, on the contrary, execution time took longer.
Created 06-14-2017 02:04 PM
Hi Jean,
Can you please try the following and let us know if the query performance improved ?
1. set up the shuffle partitions to a higher number than 200, because 200 is default value for shuffle partitions. ( spark.sql.shuffle.partitions=500 or 1000)
2. while loading hive ORC table into dataframes, use the "CLUSTER BY" clause with the join key. Something like,
df1 = sqlContext.sql("SELECT * FROM TABLE1 CLSUTER BY JOINKEY1")
df2 = sqlContext.sql("SELECT * FROM TABLE2 CLUSTER BY JOINKEY2")
df1.registerTempTable("df1_tbl")
df2.registerTempTable("df2_tbl")
Now join df1_tbl & df2_tbl using joinkey1 & joinkey2.
Created 06-15-2017 07:31 AM
Thank you for feedback.
1. Increasing shuffle.partitions led to error :
Total size of serialized results of 153680 tasks (1024.0 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
2. Using CLUSTER BY in the select reduced data shuffling from 250 GB to 1 GB and execution time was reduced from 13min to 5min. So it is a good gain.
However, I was expecting that I could persist this bucketing to have a minimum shuffling, but it seems that it is not possible, Hive and Spark are not really compatible on this topic.
Created 06-15-2017 07:25 PM
You can persist the data with partitioning by using the partitionBy(colName) while writing the data frame to a file. The next time you use the dataframe, it wont cause shuffles.
There is a JIRA for the issue you mentioned, which is fixed in 2.2. You can still workaround by increasing driver.maxResult size.
Created 07-28-2017 08:19 AM
how will i avoid shuffle if i have to join both the data frames on 2 join keys,
df1 = sqlContext.sql("SELECT * FROM TABLE1 CLSUTER BY JOINKEY1,JOINKEY2")
df2 = sqlContext.sql("SELECT * FROM TABLE2 CLUSTER BY JOINKEY1,JOINKEY2")
df3 = sqlContext.sql("SELECT * FROM TABLE3 CLUSTER BY JOINKEY1,JOINKEY2")
df4=df1.join(df2, df1.JOINKEY1=df2.JOINJEY1 and df1.JOINKEY2=df2.JOINKEY2, "inner")
df5 =df1.except(df4).union(df3)
Created 10-02-2020 12:46 AM
Wont it results into Shuffle Spill without proper memory configuration in Spark Context?