Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Part-2 : Join involving 24 billion X 1 to 8 million rows table - how to DISTRIBUTE BY

avatar
Super Collaborator

Stack : Installed HDP-2.3.2.0-2950 using Ambari 2.1

Nodes : 1 NN(8 X 1TB hdd, 16 X 2.53 GHz core processor,48GB RAM, RHEL 6.5) + 8 DN(8 X 600GB hdd, 16 X 2.53 GHz core processor, 75GB RAM, RHEL 6.5). Nodes are connected by a 10-gig network

The background thread for detailed information.

Tables information :

FactSampleValue : 24 Billion rows

DimSnapshot : 8 million

  • Currently, I have imported these tables in text format onto HDFS and have created plain/staging Hive external tables
  • Once the final table strategy is decided, I will create another set of FINAL Hive external tables and populate them with insert into FINAL.table select * from staging.table

The seemingly innocuous query :

select f.*from factsamplevalue f join DimSnapshot t on t.snapshot_id = f.snapshot_id where smapiname_ver ='dist_1';

To check the cardinality and the skew , I executed the following queries on the vanilla/staging Hive table(as expected, it took ages 😛 )

select count(distinct Snapshot_Id) cardin_Snapshot_Id, count(distinct SmapiName_ver) cardin_SmapiName_ver, count(distinct SmapiColName) cardin_SmapiColName FROM FactSampleValue;
cardin_Snapshot_Idcardin_SmapiName_vercardin_SmapiColName
7967285271745207

I created an empty external orc table as follows :

CREATE EXTERNAL TABLE IF NOT EXISTS FactSampleValue (
`Snapshot_Id` int ,
`ECU_Id` int ,
.
OTHER COLUMNS
.
)
PARTITIONED BY (`SmapiName_ver` varchar(30))
ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' STORED AS ORC LOCATION '/hiveorc';

My thoughts on the above table :

  • Since the only where condition is having SmapiName_ver , PARTITION BY SmapiName_ver

Following is the INSERT query that I have thought about :

INSERT INTO odp_dw_may2016_orc.FactSampleValue PARTITION (SmapiName_ver) SELECT * FROM odp_dw_may2016.FactSampleValue DISTRIBUTE BY ? SORT BY Snapshot_Id DESC

My thoughts :

  • As per my understanding and the community inputs, SORT BY is required to speed up the search and DISTRIBUTE BY to create less no. of output files(1-10 or more?, I don't really understand this concept), thereby, speeding up the search
  • The JOIN is on the integer column Snapshot_Id, hence, SORT BY Snapshot_Id
  • My brain fails to decide a value DISTRIBUTE BY, I have the following queries :
    • Is it necessary that the DISTRIBUTE BY column has LOW CARDINALITY
    • Is it crucial for the READ performance that the query has in the where/join condition the DISTRIBUTE BY column
    • The only OTHER column with low cardinality BUT NOT USED in the query is ECU_Id(int) which has 44 DISTINCT VALUES

How shall I load the data in the final ORC table

1 ACCEPTED SOLUTION

avatar
Master Guru

"SORT BY is required to speed up the search"

correct, with sort by each output file will be sorted by your sort condition, so ORC can skip whole blocks ( stripes ) of data based on where conditions

"DISTRIBUTE BY to create less no. of output files(1-10 or more?,"

Distribute by works very similar to buckets. ( you may be better off with buckets in case you don't understand it well however it gives you more flexibility and skips some of the problems that come with buckets ). Essentially distribute by forces a reducer with a shuffle and distributes data by the key. So if you distribute by smapiname_ver you would have all values with the same smapiname in the same output file. Also if you distribute with the partition key you can make sure that each reducer only writes to a single output file. Together with forcing the number of reducers you have essentially a similar power to buckets ( and more flexibility ) . But again if you don't understand it you might be better off with buckets sorted. and the optimized sorted load.

"The JOIN is on the integer column Snapshot_Id, hence, SORT BY Snapshot_Id"

Hmmm no, you do not filter by the snapshot_id, you still need all of them so the predicate pushdown doesn't help you thee.

"Is it necessary that the DISTRIBUTE BY column has LOW CARDINALITY"

No you define the number of files with the number of reducers, the distribute by decides how data is distributed between them. Make sure that the partition key is part of the distribute by and any other key you want to add where conditions on. ( ideally still allowing parallelity.

View solution in original post

3 REPLIES 3

avatar
Master Guru

"SORT BY is required to speed up the search"

correct, with sort by each output file will be sorted by your sort condition, so ORC can skip whole blocks ( stripes ) of data based on where conditions

"DISTRIBUTE BY to create less no. of output files(1-10 or more?,"

Distribute by works very similar to buckets. ( you may be better off with buckets in case you don't understand it well however it gives you more flexibility and skips some of the problems that come with buckets ). Essentially distribute by forces a reducer with a shuffle and distributes data by the key. So if you distribute by smapiname_ver you would have all values with the same smapiname in the same output file. Also if you distribute with the partition key you can make sure that each reducer only writes to a single output file. Together with forcing the number of reducers you have essentially a similar power to buckets ( and more flexibility ) . But again if you don't understand it you might be better off with buckets sorted. and the optimized sorted load.

"The JOIN is on the integer column Snapshot_Id, hence, SORT BY Snapshot_Id"

Hmmm no, you do not filter by the snapshot_id, you still need all of them so the predicate pushdown doesn't help you thee.

"Is it necessary that the DISTRIBUTE BY column has LOW CARDINALITY"

No you define the number of files with the number of reducers, the distribute by decides how data is distributed between them. Make sure that the partition key is part of the distribute by and any other key you want to add where conditions on. ( ideally still allowing parallelity.

avatar
Super Collaborator

So if you distribute by smapiname_ver you would have all values with the same smapiname in the same output file. Also if you distribute with the partition key you can make sure that each reducer only writes to a single output file.

Does this mean that I have to explicitly set the no. of reducers on the Hive prompt ? Is it mandatory for the CORRECT insertion of data ?

set mapred.reduce.tasks=; (what value shall I provide?)

But again if you don't understand it you might be better off with buckets sorted. and the optimized sorted load.

Does this mean something like this :

set hive.enforce.sorting=true;
set hive.enforce.bucketing=true;
set optimize.sort.dynamic.partitioning=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions.pernode=3000;

AND

CREATE EXTERNAL TABLE IF NOT EXISTS FactSampleValue_ORC (`Snapshot_Id` int ,`ECU_Id` int ,.OTHER COLUMNS.)PARTITIONED BY (`SmapiName_ver` varchar(30)) CLUSTERED BY SmapiName_ver INTO 256 BUCKETS      ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' STORED AS ORC LOCATION '/hiveorc';

Hmmm no, you do not filter by the snapshot_id, you still need all of them so the predicate pushdown doesn't help you thee.

+

Make sure that the partition key is part of the distribute by and any other key you want to add where conditions on.

Unfortunately, there is only one where condition(where smapiname_ver ='dist_1'), so I am left only with one column on which partitioning is already considered.

Does this mean something like this :

INSERT INTO FactSampleValue_ORC PARTITION (SmapiName_ver) SELECT * FROM FactSampleValue DISTRIBUTE BY SmapiName_ver SORT BY ?;

avatar
Master Guru

@Kaliyug Antagonist

"Does this mean that I have to explicitly set the no. of reducers on the Hive prompt ? Is it mandatory for the CORRECT insertion of data ?

Its not mandatory for the correct insertion but for the performance. If you have a hundred you have a hundred files and the smapis divided between them ( all values for one ending up in the same file ) if you have 10 you will have ten files. So there is a direct correlation with load speed ( and to a lesser extent query performance as well

and yeah buckets might be your better bet

"Unfortunately, there is only one where condition(where smapiname_ver ='dist_1'), so I am left only with one column on which partitioning is already considered."

So once you use buckets you don't use distribute by anymore its either or sort you specify it in the table definition

https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-BucketedSorted...

see how they specify the sorted by keyword in the table definition? If you then load data into it you hive will do the distribute/sort stiuff itself.