<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>question Re: How to reduce Spark shuffling caused by join with data coming from Hive in Archives of Support Questions (Read Only)</title>
    <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-to-reduce-Spark-shuffling-caused-by-join-with-data/m-p/205791#M62703</link>
    <description>&lt;P&gt;Thank you for feedback.&lt;/P&gt;&lt;P&gt;1. Increasing shuffle.partitions led to error : &lt;/P&gt;&lt;P&gt;Total size of serialized results of 153680 tasks (1024.0 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)&lt;/P&gt;&lt;P&gt;2. Using CLUSTER BY in the select reduced data shuffling from 250 GB to 1 GB and execution time was reduced from 13min to 5min. So it is a good gain.&lt;/P&gt;&lt;P&gt;However, I was expecting that I could persist this bucketing to have a minimum shuffling, but it seems that it is not possible, Hive and Spark are not really compatible on this topic.&lt;/P&gt;</description>
    <pubDate>Thu, 15 Jun 2017 14:31:11 GMT</pubDate>
    <dc:creator>jgourdet</dc:creator>
    <dc:date>2017-06-15T14:31:11Z</dc:date>
    <item>
      <title>How to reduce Spark shuffling caused by join with data coming from Hive</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-to-reduce-Spark-shuffling-caused-by-join-with-data/m-p/205787#M62699</link>
      <description>&lt;P&gt;Hello,&lt;/P&gt;&lt;P&gt;I am loading data from Hive table with Spark and make several transformations including a join between two datasets.&lt;/P&gt;&lt;P&gt;This join is causing a large volume of data shuffling (read) making this operation is quite slow.&lt;/P&gt;&lt;P&gt;To avoid this such shuffling, I imagine that data in Hive should be splitted accross nodes according the fields used for join.&lt;/P&gt;&lt;P&gt;But how to do it in practice?&lt;/P&gt;&lt;P&gt;Using Hive bucketing ?&lt;/P&gt;&lt;P&gt;Thank you in advance for your suggestions.&lt;/P&gt;</description>
      <pubDate>Mon, 12 Jun 2017 14:00:06 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-to-reduce-Spark-shuffling-caused-by-join-with-data/m-p/205787#M62699</guid>
      <dc:creator>jgourdet</dc:creator>
      <dc:date>2017-06-12T14:00:06Z</dc:date>
    </item>
    <item>
      <title>Re: How to reduce Spark shuffling caused by join with data coming from Hive</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-to-reduce-Spark-shuffling-caused-by-join-with-data/m-p/205788#M62700</link>
      <description>&lt;P&gt;Hi &lt;A rel="user" href="https://community.cloudera.com/users/16921/jgourdet.html" nodeid="16921"&gt;@Jean-Sebastien Gourdet&lt;/A&gt;,&lt;/P&gt;&lt;P&gt;There are couple of options
available to reduce the shuffle (not eliminate in some cases)&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;Using the broadcast
variables&lt;/LI&gt;&lt;/UL&gt;&lt;P style="margin-left: 40px;"&gt;By using
the broad cast variable, you can eliminate the shuffle of a big table, however
you must broadcast the small data across all the executors &lt;/P&gt;&lt;P style="margin-left: 40px;"&gt;This
may not be feasible all the cases, if both tables are big.&lt;/P&gt;&lt;P style="margin-left: 40px;"&gt;&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;The other
alternative (good practice to implement) is to implement the predicated
pushdown for Hive data, this filters only the data which is required for the
computation at the Hive Level and extract small amount of data.&lt;/LI&gt;&lt;/UL&gt;&lt;P style="margin-left: 40px;"&gt;This may not avoid
complete shuffle but certainly speed up the shuffle as the amount of the data
which pulled to memory will reduce significantly ( in some cases)&lt;/P&gt;&lt;PRE&gt;sqlContext.setConf("spark.sql.orc.filterPushdown",
"true")  -- If
you are using ORC files / spark.sql.parquet.filterPushdown in case of Parquet
files.&lt;/PRE&gt;&lt;UL&gt;
&lt;LI&gt;Last but &lt;STRONG&gt;not&lt;/STRONG&gt; &lt;STRONG&gt;&lt;EM&gt;recommended&lt;/EM&gt;&lt;/STRONG&gt; approach is
to extract form single partition by keeping the option .repartitin(1) to the DataFrame
you will be avoided the shuffle but all the data will not count on parallelism
as the single executor participate on the operation.&lt;/LI&gt;&lt;/UL&gt;&lt;P&gt;On the other note, the
shuffle will be quick if the data is evenly distributed (key being used to join
the table).&lt;/P&gt;</description>
      <pubDate>Wed, 14 Jun 2017 11:33:12 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-to-reduce-Spark-shuffling-caused-by-join-with-data/m-p/205788#M62700</guid>
      <dc:creator>bkosaraju</dc:creator>
      <dc:date>2017-06-14T11:33:12Z</dc:date>
    </item>
    <item>
      <title>Re: How to reduce Spark shuffling caused by join with data coming from Hive</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-to-reduce-Spark-shuffling-caused-by-join-with-data/m-p/205789#M62701</link>
      <description>&lt;P&gt;Hi Jean,&lt;/P&gt;&lt;P&gt;Can you please try the following and let us know if the query performance improved ?&lt;/P&gt;&lt;P&gt;1. set up the shuffle partitions to a higher number than 200, because 200 is default value for shuffle partitions. ( spark.sql.shuffle.partitions=500 or 1000)&lt;/P&gt;&lt;P&gt;2. while loading hive ORC table into dataframes, use the "CLUSTER BY" clause with the join key.  Something like, &lt;/P&gt;&lt;P&gt;df1 = sqlContext.sql("SELECT * FROM TABLE1 CLSUTER BY JOINKEY1")&lt;/P&gt;&lt;P&gt;df2 = sqlContext.sql("SELECT * FROM TABLE2 CLUSTER BY JOINKEY2")&lt;/P&gt;&lt;P&gt;df1.registerTempTable("df1_tbl")&lt;/P&gt;&lt;P&gt;df2.registerTempTable("df2_tbl")&lt;/P&gt;&lt;P&gt;Now join df1_tbl &amp;amp; df2_tbl using joinkey1 &amp;amp; joinkey2. &lt;/P&gt;</description>
      <pubDate>Wed, 14 Jun 2017 21:04:30 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-to-reduce-Spark-shuffling-caused-by-join-with-data/m-p/205789#M62701</guid>
      <dc:creator>anatva</dc:creator>
      <dc:date>2017-06-14T21:04:30Z</dc:date>
    </item>
    <item>
      <title>Re: How to reduce Spark shuffling caused by join with data coming from Hive</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-to-reduce-Spark-shuffling-caused-by-join-with-data/m-p/205790#M62702</link>
      <description>&lt;P&gt;Thanks for feedback.&lt;/P&gt;&lt;P&gt;For broadcast variables, it is not so much applicable in my case as I have big tables.&lt;/P&gt;&lt;P&gt;Concerning filterpushdown, it has not brought results, on the contrary, execution time took longer.&lt;/P&gt;</description>
      <pubDate>Thu, 15 Jun 2017 14:27:28 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-to-reduce-Spark-shuffling-caused-by-join-with-data/m-p/205790#M62702</guid>
      <dc:creator>jgourdet</dc:creator>
      <dc:date>2017-06-15T14:27:28Z</dc:date>
    </item>
    <item>
      <title>Re: How to reduce Spark shuffling caused by join with data coming from Hive</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-to-reduce-Spark-shuffling-caused-by-join-with-data/m-p/205791#M62703</link>
      <description>&lt;P&gt;Thank you for feedback.&lt;/P&gt;&lt;P&gt;1. Increasing shuffle.partitions led to error : &lt;/P&gt;&lt;P&gt;Total size of serialized results of 153680 tasks (1024.0 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)&lt;/P&gt;&lt;P&gt;2. Using CLUSTER BY in the select reduced data shuffling from 250 GB to 1 GB and execution time was reduced from 13min to 5min. So it is a good gain.&lt;/P&gt;&lt;P&gt;However, I was expecting that I could persist this bucketing to have a minimum shuffling, but it seems that it is not possible, Hive and Spark are not really compatible on this topic.&lt;/P&gt;</description>
      <pubDate>Thu, 15 Jun 2017 14:31:11 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-to-reduce-Spark-shuffling-caused-by-join-with-data/m-p/205791#M62703</guid>
      <dc:creator>jgourdet</dc:creator>
      <dc:date>2017-06-15T14:31:11Z</dc:date>
    </item>
    <item>
      <title>Re: How to reduce Spark shuffling caused by join with data coming from Hive</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-to-reduce-Spark-shuffling-caused-by-join-with-data/m-p/205792#M62704</link>
      <description>&lt;P&gt;You can persist the data with partitioning by using the partitionBy(colName) while writing the data frame to a file. The next time you use the dataframe, it wont cause shuffles.&lt;/P&gt;&lt;P&gt;There is a JIRA for the issue you mentioned, which is fixed in 2.2. You can still workaround by increasing driver.maxResult size.&lt;/P&gt;&lt;OL&gt;
&lt;LI&gt;&lt;A href="https://issues.apache.org/jira/browse/SPARK-12837"&gt;SPARK-12837&lt;/A&gt;&lt;/LI&gt;&lt;/OL&gt;</description>
      <pubDate>Fri, 16 Jun 2017 02:25:33 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-to-reduce-Spark-shuffling-caused-by-join-with-data/m-p/205792#M62704</guid>
      <dc:creator>anatva</dc:creator>
      <dc:date>2017-06-16T02:25:33Z</dc:date>
    </item>
    <item>
      <title>Re: How to reduce Spark shuffling caused by join with data coming from Hive</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-to-reduce-Spark-shuffling-caused-by-join-with-data/m-p/205793#M62705</link>
      <description>&lt;P&gt;how will i avoid shuffle if i have to join both the data frames on 2 join keys,&lt;/P&gt;&lt;P&gt;df1 = sqlContext.sql("SELECT * FROM TABLE1 CLSUTER BY JOINKEY1,JOINKEY2")&lt;/P&gt;&lt;P&gt;df2 = sqlContext.sql("SELECT * FROM TABLE2 CLUSTER BY JOINKEY1,JOINKEY2")&lt;/P&gt;&lt;P&gt;df3 = sqlContext.sql("SELECT * FROM TABLE3 CLUSTER BY JOINKEY1,JOINKEY2")&lt;/P&gt;&lt;P&gt;df4=df1.join(df2, df1.JOINKEY1=df2.JOINJEY1 and df1.JOINKEY2=df2.JOINKEY2, "inner")&lt;/P&gt;&lt;P&gt;df5 =df1.except(df4).union(df3)&lt;/P&gt;</description>
      <pubDate>Fri, 28 Jul 2017 15:19:06 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-to-reduce-Spark-shuffling-caused-by-join-with-data/m-p/205793#M62705</guid>
      <dc:creator>100ashok</dc:creator>
      <dc:date>2017-07-28T15:19:06Z</dc:date>
    </item>
    <item>
      <title>Re: How to reduce Spark shuffling caused by join with data coming from Hive</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-to-reduce-Spark-shuffling-caused-by-join-with-data/m-p/303835#M62706</link>
      <description>&lt;P&gt;Wont it results into Shuffle Spill without proper memory configuration in Spark Context?&lt;/P&gt;</description>
      <pubDate>Fri, 02 Oct 2020 07:46:43 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-to-reduce-Spark-shuffling-caused-by-join-with-data/m-p/303835#M62706</guid>
      <dc:creator>Janard</dc:creator>
      <dc:date>2020-10-02T07:46:43Z</dc:date>
    </item>
  </channel>
</rss>

