Created 05-26-2016 09:04 AM
Stack : Installed HDP-2.3.2.0-2950 using Ambari 2.1
Nodes : 1 NN(8 X 1TB hdd, 16 X 2.53 GHz core processor,48GB RAM, RHEL 6.5) + 8 DN(8 X 600GB hdd, 16 X 2.53 GHz core processor, 75GB RAM, RHEL 6.5). Nodes are connected by a 10-gig network
The background thread for detailed information.
Tables information :
FactSampleValue : 24 Billion rows
DimSnapshot : 8 million
The seemingly innocuous query :
select f.*from factsamplevalue f join DimSnapshot t on t.snapshot_id = f.snapshot_id where smapiname_ver ='dist_1';
To check the cardinality and the skew , I executed the following queries on the vanilla/staging Hive table(as expected, it took ages 😛 )
select count(distinct Snapshot_Id) cardin_Snapshot_Id, count(distinct SmapiName_ver) cardin_SmapiName_ver, count(distinct SmapiColName) cardin_SmapiColName FROM FactSampleValue;
cardin_Snapshot_Id | cardin_SmapiName_ver | cardin_SmapiColName |
7967285 | 2717 | 45207 |
I created an empty external orc table as follows :
CREATE EXTERNAL TABLE IF NOT EXISTS FactSampleValue ( `Snapshot_Id` int , `ECU_Id` int , . OTHER COLUMNS . ) PARTITIONED BY (`SmapiName_ver` varchar(30)) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' STORED AS ORC LOCATION '/hiveorc';
My thoughts on the above table :
Following is the INSERT query that I have thought about :
INSERT INTO odp_dw_may2016_orc.FactSampleValue PARTITION (SmapiName_ver) SELECT * FROM odp_dw_may2016.FactSampleValue DISTRIBUTE BY ? SORT BY Snapshot_Id DESC
My thoughts :
How shall I load the data in the final ORC table
Created 05-26-2016 10:05 AM
"SORT BY is required to speed up the search"
correct, with sort by each output file will be sorted by your sort condition, so ORC can skip whole blocks ( stripes ) of data based on where conditions
"DISTRIBUTE BY to create less no. of output files(1-10 or more?,"
Distribute by works very similar to buckets. ( you may be better off with buckets in case you don't understand it well however it gives you more flexibility and skips some of the problems that come with buckets ). Essentially distribute by forces a reducer with a shuffle and distributes data by the key. So if you distribute by smapiname_ver you would have all values with the same smapiname in the same output file. Also if you distribute with the partition key you can make sure that each reducer only writes to a single output file. Together with forcing the number of reducers you have essentially a similar power to buckets ( and more flexibility ) . But again if you don't understand it you might be better off with buckets sorted. and the optimized sorted load.
"The JOIN is on the integer column Snapshot_Id, hence, SORT BY Snapshot_Id"
Hmmm no, you do not filter by the snapshot_id, you still need all of them so the predicate pushdown doesn't help you thee.
"Is it necessary that the DISTRIBUTE BY column has LOW CARDINALITY"
No you define the number of files with the number of reducers, the distribute by decides how data is distributed between them. Make sure that the partition key is part of the distribute by and any other key you want to add where conditions on. ( ideally still allowing parallelity.
Created 05-26-2016 10:05 AM
"SORT BY is required to speed up the search"
correct, with sort by each output file will be sorted by your sort condition, so ORC can skip whole blocks ( stripes ) of data based on where conditions
"DISTRIBUTE BY to create less no. of output files(1-10 or more?,"
Distribute by works very similar to buckets. ( you may be better off with buckets in case you don't understand it well however it gives you more flexibility and skips some of the problems that come with buckets ). Essentially distribute by forces a reducer with a shuffle and distributes data by the key. So if you distribute by smapiname_ver you would have all values with the same smapiname in the same output file. Also if you distribute with the partition key you can make sure that each reducer only writes to a single output file. Together with forcing the number of reducers you have essentially a similar power to buckets ( and more flexibility ) . But again if you don't understand it you might be better off with buckets sorted. and the optimized sorted load.
"The JOIN is on the integer column Snapshot_Id, hence, SORT BY Snapshot_Id"
Hmmm no, you do not filter by the snapshot_id, you still need all of them so the predicate pushdown doesn't help you thee.
"Is it necessary that the DISTRIBUTE BY column has LOW CARDINALITY"
No you define the number of files with the number of reducers, the distribute by decides how data is distributed between them. Make sure that the partition key is part of the distribute by and any other key you want to add where conditions on. ( ideally still allowing parallelity.
Created 05-26-2016 10:27 AM
So if you distribute by smapiname_ver you would have all values with the same smapiname in the same output file. Also if you distribute with the partition key you can make sure that each reducer only writes to a single output file.
Does this mean that I have to explicitly set the no. of reducers on the Hive prompt ? Is it mandatory for the CORRECT insertion of data ?
set mapred.reduce.tasks=; (what value shall I provide?)
But again if you don't understand it you might be better off with buckets sorted. and the optimized sorted load.
Does this mean something like this :
set hive.enforce.sorting=true; set hive.enforce.bucketing=true; set optimize.sort.dynamic.partitioning=true; set hive.exec.dynamic.partition.mode=nonstrict; set hive.exec.max.dynamic.partitions.pernode=3000;
AND
CREATE EXTERNAL TABLE IF NOT EXISTS FactSampleValue_ORC (`Snapshot_Id` int ,`ECU_Id` int ,.OTHER COLUMNS.)PARTITIONED BY (`SmapiName_ver` varchar(30)) CLUSTERED BY SmapiName_ver INTO 256 BUCKETS ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' STORED AS ORC LOCATION '/hiveorc';
Hmmm no, you do not filter by the snapshot_id, you still need all of them so the predicate pushdown doesn't help you thee.
+
Make sure that the partition key is part of the distribute by and any other key you want to add where conditions on.
Unfortunately, there is only one where condition(where smapiname_ver ='dist_1'), so I am left only with one column on which partitioning is already considered.
Does this mean something like this :
INSERT INTO FactSampleValue_ORC PARTITION (SmapiName_ver) SELECT * FROM FactSampleValue DISTRIBUTE BY SmapiName_ver SORT BY ?;
Created 05-29-2016 10:14 PM
"Does this mean that I have to explicitly set the no. of reducers on the Hive prompt ? Is it mandatory for the CORRECT insertion of data ?
Its not mandatory for the correct insertion but for the performance. If you have a hundred you have a hundred files and the smapis divided between them ( all values for one ending up in the same file ) if you have 10 you will have ten files. So there is a direct correlation with load speed ( and to a lesser extent query performance as well
and yeah buckets might be your better bet
"Unfortunately, there is only one where condition(where smapiname_ver ='dist_1'), so I am left only with one column on which partitioning is already considered."
So once you use buckets you don't use distribute by anymore its either or sort you specify it in the table definition
see how they specify the sorted by keyword in the table definition? If you then load data into it you hive will do the distribute/sort stiuff itself.