<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>question Re: Part-2 : Join involving 24 billion X 1 to 8 million rows table - how to DISTRIBUTE BY in Archives of Support Questions (Read Only)</title>
    <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Part-2-Join-involving-24-billion-X-1-to-8-million-rows-table/m-p/168831#M29784</link>
    <description>&lt;P&gt;"&lt;STRONG&gt;SORT BY &lt;/STRONG&gt;is required to speed up the search"&lt;/P&gt;&lt;P&gt;correct, with sort by each output file will be sorted by your sort condition, so ORC can skip whole blocks  ( stripes ) of data based on where conditions&lt;/P&gt;&lt;P&gt;&lt;STRONG&gt;"DISTRIBUTE BY to create less no. of output files(1-10 or more?,"&lt;/STRONG&gt;&lt;/P&gt;&lt;P&gt;Distribute by works very similar to buckets. ( you may be better off with buckets in case you don't understand it well however it gives you more flexibility and skips some of the problems that come with buckets ). Essentially distribute by forces a reducer with a shuffle and distributes data by the key. So if you distribute by  smapiname_ver you would have all values with the same smapiname in the same output file. Also if you distribute with the partition key you can make sure that each reducer only writes to a single output file. Together with forcing the number of reducers you have essentially a similar power to buckets ( and more flexibility ) . But again if you don't understand it you might be better off with buckets sorted. and the optimized sorted load.&lt;/P&gt;&lt;P&gt;&lt;STRONG&gt;"The JOIN&lt;/STRONG&gt;&lt;STRONG&gt; is on the integer column Snapshot_Id, hence, SORT BY Snapshot_Id"&lt;/STRONG&gt;&lt;/P&gt;&lt;P&gt;Hmmm no, you do not filter by the snapshot_id, you still need all of them so the predicate pushdown doesn't help you thee.&lt;/P&gt;&lt;P&gt;&lt;STRONG&gt;"Is it necessary that the DISTRIBUTE BY column has LOW CARDINALITY"&lt;/STRONG&gt;&lt;/P&gt;&lt;P&gt;No you define the number of files with the number of reducers, the distribute by decides how data is distributed between them. Make sure that the partition key is part of the distribute by and any other key you want to add where conditions on. ( ideally still allowing parallelity.&lt;/P&gt;</description>
    <pubDate>Thu, 26 May 2016 17:05:24 GMT</pubDate>
    <dc:creator>bleonhardi</dc:creator>
    <dc:date>2016-05-26T17:05:24Z</dc:date>
    <item>
      <title>Part-2 : Join involving 24 billion X 1 to 8 million rows table - how to DISTRIBUTE BY</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Part-2-Join-involving-24-billion-X-1-to-8-million-rows-table/m-p/168830#M29783</link>
      <description>&lt;P&gt;Stack : Installed HDP-2.3.2.0-2950 using Ambari 2.1&lt;/P&gt;&lt;P&gt;Nodes : 1 NN(8 X 1TB hdd, 16 X 2.53 GHz core processor,48GB RAM, RHEL 6.5) + 8 DN(8 X 600GB hdd, 16 X 2.53 GHz core processor, 75GB RAM, RHEL 6.5). Nodes are connected by a 10-gig network&lt;/P&gt;&lt;P&gt;The &lt;A href="https://community.hortonworks.com/questions/34866/part-1-join-involving-24-billion-x-1-to-8-million.html"&gt;background thread&lt;/A&gt; for detailed information.&lt;/P&gt;&lt;P&gt;Tables information :&lt;/P&gt;&lt;P&gt;FactSampleValue : 24 Billion rows&lt;/P&gt;&lt;P&gt;DimSnapshot : 8 million&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;Currently, I have imported these tables in text format onto HDFS and have created plain/staging Hive external tables&lt;/LI&gt;&lt;LI&gt;Once the final table strategy is decided, I will create another set of FINAL Hive external tables and populate them with insert into FINAL.table select * from staging.table&lt;/LI&gt;&lt;/UL&gt;&lt;P&gt;The &lt;STRONG&gt;seemingly &lt;/STRONG&gt;innocuous query :&lt;/P&gt;&lt;PRE&gt;select f.*from factsamplevalue f join DimSnapshot t on t.snapshot_id = f.snapshot_id where smapiname_ver ='dist_1';&lt;/PRE&gt;&lt;P&gt;To check the cardinality and the skew , I executed the following queries on the vanilla/staging Hive table(as expected, it took ages &lt;span class="lia-unicode-emoji" title=":face_with_tongue:"&gt;😛&lt;/span&gt; )&lt;/P&gt;&lt;PRE&gt;select count(distinct Snapshot_Id) cardin_Snapshot_Id, count(distinct SmapiName_ver) cardin_SmapiName_ver, count(distinct SmapiColName) cardin_SmapiColName FROM FactSampleValue;&lt;/PRE&gt;&lt;TABLE&gt;&lt;TBODY&gt;&lt;TR&gt;&lt;TD&gt;cardin_Snapshot_Id&lt;/TD&gt;&lt;TD&gt;cardin_SmapiName_ver&lt;/TD&gt;&lt;TD&gt;cardin_SmapiColName &lt;/TD&gt;&lt;/TR&gt;&lt;TR&gt;&lt;TD&gt;7967285&lt;/TD&gt;&lt;TD&gt;2717&lt;/TD&gt;&lt;TD&gt;45207&lt;/TD&gt;&lt;/TR&gt;&lt;/TBODY&gt;&lt;/TABLE&gt;&lt;P&gt;I created an &lt;STRONG&gt;empty external orc&lt;/STRONG&gt; table as follows :&lt;/P&gt;&lt;PRE&gt;CREATE EXTERNAL TABLE IF NOT EXISTS FactSampleValue (
`Snapshot_Id` int ,
`ECU_Id` int ,
.
OTHER COLUMNS
.
)
PARTITIONED BY (`SmapiName_ver` varchar(30))
ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' STORED AS ORC LOCATION '/hiveorc';&lt;/PRE&gt;&lt;P&gt;My thoughts on the above table :&lt;/P&gt;&lt;UL&gt;
&lt;LI&gt;Since the only where condition is having SmapiName_ver
, PARTITION BY SmapiName_ver&lt;/LI&gt;&lt;/UL&gt;&lt;P&gt;Following is the INSERT query that I have thought about :&lt;/P&gt;&lt;PRE&gt;INSERT INTO odp_dw_may2016_orc.FactSampleValue PARTITION (SmapiName_ver) SELECT * FROM odp_dw_may2016.FactSampleValue DISTRIBUTE BY ? SORT BY Snapshot_Id DESC&lt;/PRE&gt;&lt;P&gt;My thoughts :&lt;/P&gt;&lt;UL&gt;
&lt;LI&gt;As per my understanding and the community inputs, &lt;STRONG&gt;SORT BY &lt;/STRONG&gt;is required to speed up the search and &lt;STRONG&gt;DISTRIBUTE BY to create less no. of output files(1-10 or more?, I don't really understand this concept)&lt;/STRONG&gt;, thereby, speeding up the search&lt;/LI&gt;&lt;LI&gt;The &lt;STRONG&gt;JOIN&lt;/STRONG&gt; is on the integer column Snapshot_Id, hence, SORT BY Snapshot_Id&lt;/LI&gt;&lt;LI&gt;My brain fails to decide a value DISTRIBUTE BY, I have the following queries :
&lt;UL&gt;
&lt;LI&gt;Is it necessary that the DISTRIBUTE BY column has LOW CARDINALITY&lt;/LI&gt;&lt;LI&gt;Is it crucial for the READ performance that the query has in the where/join condition the DISTRIBUTE BY column&lt;/LI&gt;&lt;LI&gt;The only OTHER column with low cardinality &lt;STRONG&gt;BUT NOT USED in the query&lt;/STRONG&gt; is ECU_Id(int) which has 44 DISTINCT VALUES&lt;/LI&gt;&lt;/UL&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;P&gt;How shall I load the data in the final ORC table&lt;/P&gt;</description>
      <pubDate>Thu, 26 May 2016 16:04:40 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Part-2-Join-involving-24-billion-X-1-to-8-million-rows-table/m-p/168830#M29783</guid>
      <dc:creator>kaliyugantagoni</dc:creator>
      <dc:date>2016-05-26T16:04:40Z</dc:date>
    </item>
    <item>
      <title>Re: Part-2 : Join involving 24 billion X 1 to 8 million rows table - how to DISTRIBUTE BY</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Part-2-Join-involving-24-billion-X-1-to-8-million-rows-table/m-p/168831#M29784</link>
      <description>&lt;P&gt;"&lt;STRONG&gt;SORT BY &lt;/STRONG&gt;is required to speed up the search"&lt;/P&gt;&lt;P&gt;correct, with sort by each output file will be sorted by your sort condition, so ORC can skip whole blocks  ( stripes ) of data based on where conditions&lt;/P&gt;&lt;P&gt;&lt;STRONG&gt;"DISTRIBUTE BY to create less no. of output files(1-10 or more?,"&lt;/STRONG&gt;&lt;/P&gt;&lt;P&gt;Distribute by works very similar to buckets. ( you may be better off with buckets in case you don't understand it well however it gives you more flexibility and skips some of the problems that come with buckets ). Essentially distribute by forces a reducer with a shuffle and distributes data by the key. So if you distribute by  smapiname_ver you would have all values with the same smapiname in the same output file. Also if you distribute with the partition key you can make sure that each reducer only writes to a single output file. Together with forcing the number of reducers you have essentially a similar power to buckets ( and more flexibility ) . But again if you don't understand it you might be better off with buckets sorted. and the optimized sorted load.&lt;/P&gt;&lt;P&gt;&lt;STRONG&gt;"The JOIN&lt;/STRONG&gt;&lt;STRONG&gt; is on the integer column Snapshot_Id, hence, SORT BY Snapshot_Id"&lt;/STRONG&gt;&lt;/P&gt;&lt;P&gt;Hmmm no, you do not filter by the snapshot_id, you still need all of them so the predicate pushdown doesn't help you thee.&lt;/P&gt;&lt;P&gt;&lt;STRONG&gt;"Is it necessary that the DISTRIBUTE BY column has LOW CARDINALITY"&lt;/STRONG&gt;&lt;/P&gt;&lt;P&gt;No you define the number of files with the number of reducers, the distribute by decides how data is distributed between them. Make sure that the partition key is part of the distribute by and any other key you want to add where conditions on. ( ideally still allowing parallelity.&lt;/P&gt;</description>
      <pubDate>Thu, 26 May 2016 17:05:24 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Part-2-Join-involving-24-billion-X-1-to-8-million-rows-table/m-p/168831#M29784</guid>
      <dc:creator>bleonhardi</dc:creator>
      <dc:date>2016-05-26T17:05:24Z</dc:date>
    </item>
    <item>
      <title>Re: Part-2 : Join involving 24 billion X 1 to 8 million rows table - how to DISTRIBUTE BY</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Part-2-Join-involving-24-billion-X-1-to-8-million-rows-table/m-p/168832#M29785</link>
      <description>&lt;P&gt;&lt;STRONG&gt;So if you distribute by smapiname_ver you would have all values with the same smapiname in the same output file. Also if you distribute with the partition key you can make sure that each reducer only writes to a single output file.&lt;/STRONG&gt;&lt;/P&gt;&lt;P&gt;Does this mean that I have to &lt;STRONG&gt;explicitly set the no. of reducers on the Hive prompt &lt;/STRONG&gt;? Is it mandatory for the CORRECT insertion of data ?&lt;/P&gt;&lt;PRE&gt;set mapred.reduce.tasks=; (what value shall I provide?)&lt;/PRE&gt;&lt;P&gt;&lt;STRONG&gt;But again if you don't understand it you might be better off with buckets sorted. and the optimized sorted load.&lt;/STRONG&gt;&lt;/P&gt;&lt;P&gt;Does this mean something like this :&lt;/P&gt;&lt;PRE&gt;set hive.enforce.sorting=true;
set hive.enforce.bucketing=true;
set optimize.sort.dynamic.partitioning=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions.pernode=3000;&lt;/PRE&gt;&lt;P&gt;&lt;STRONG&gt;AND&lt;/STRONG&gt;&lt;/P&gt;&lt;PRE&gt;CREATE EXTERNAL TABLE IF NOT EXISTS FactSampleValue_ORC (`Snapshot_Id` int ,`ECU_Id` int ,.OTHER COLUMNS.)PARTITIONED BY (`SmapiName_ver` varchar(30)) CLUSTERED BY SmapiName_ver INTO 256 BUCKETS      ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' STORED AS ORC LOCATION '/hiveorc';
&lt;/PRE&gt;&lt;P&gt;&lt;STRONG&gt;Hmmm no, you do not filter by the snapshot_id, you still need all of them so the predicate pushdown doesn't help you thee.&lt;/STRONG&gt;&lt;/P&gt;&lt;P&gt;+&lt;/P&gt;&lt;P&gt;&lt;STRONG&gt;Make sure that the partition key is part of the distribute by and any other key you want to add where conditions on.&lt;/STRONG&gt;&lt;/P&gt;&lt;P&gt;Unfortunately, there is only one where condition(where smapiname_ver ='dist_1'), so I am left only with one column on which partitioning is already considered.&lt;/P&gt;&lt;P&gt;Does this mean something like this :&lt;/P&gt;&lt;PRE&gt;INSERT INTO FactSampleValue_ORC PARTITION (SmapiName_ver) SELECT * FROM FactSampleValue DISTRIBUTE BY SmapiName_ver SORT BY ?;&lt;/PRE&gt;</description>
      <pubDate>Thu, 26 May 2016 17:27:23 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Part-2-Join-involving-24-billion-X-1-to-8-million-rows-table/m-p/168832#M29785</guid>
      <dc:creator>kaliyugantagoni</dc:creator>
      <dc:date>2016-05-26T17:27:23Z</dc:date>
    </item>
    <item>
      <title>Re: Part-2 : Join involving 24 billion X 1 to 8 million rows table - how to DISTRIBUTE BY</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Part-2-Join-involving-24-billion-X-1-to-8-million-rows-table/m-p/168833#M29786</link>
      <description>&lt;P&gt;&lt;A rel="user" href="https://community.cloudera.com/users/5134/kaliyugantagonist.html" nodeid="5134"&gt;@Kaliyug Antagonist&lt;/A&gt; &lt;/P&gt;&lt;P&gt;"Does this mean that I have to &lt;STRONG&gt;explicitly set the no. of reducers on the Hive prompt &lt;/STRONG&gt;? Is it mandatory for the CORRECT insertion of data ?&lt;/P&gt;&lt;P&gt;Its not mandatory for the correct insertion but for the performance. If you have a hundred you have a hundred files and the smapis divided between them ( all values for one ending up in the same file ) if you have 10 you will have ten files. So there is a direct correlation with load speed ( and to a lesser extent query performance as well&lt;/P&gt;&lt;P&gt;and yeah buckets might be your better bet&lt;/P&gt;&lt;P&gt;"Unfortunately, there is only one where condition(where smapiname_ver ='dist_1'), so I am left only with one column on which partitioning is already considered."&lt;/P&gt;&lt;P&gt;So once you use buckets you don't use distribute by anymore its either or sort you specify it in the table definition&lt;/P&gt;&lt;P&gt;&lt;A href="https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-BucketedSortedTables"&gt;https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-BucketedSortedTables&lt;/A&gt;&lt;/P&gt;&lt;P&gt;see how they specify the sorted by keyword in the table definition? If you then load data into it you hive will do the distribute/sort stiuff itself.&lt;/P&gt;</description>
      <pubDate>Mon, 30 May 2016 05:14:04 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Part-2-Join-involving-24-billion-X-1-to-8-million-rows-table/m-p/168833#M29786</guid>
      <dc:creator>bleonhardi</dc:creator>
      <dc:date>2016-05-30T05:14:04Z</dc:date>
    </item>
  </channel>
</rss>

