Created 05-23-2016 09:48 AM
Stack : Installed HDP-2.3.2.0-2950 using Ambari 2.1
Nodes : 1 NN(8 X 1TB hdd, 16 X 2.53 GHz core processor,48GB RAM, RHEL 6.5) + 8 DN(8 X 600GB hdd, 16 X 2.53 GHz core processor, 75GB RAM, RHEL 6.5). Nodes are connected by a 10-gig network
Tables information :
FactSampleValue : 24 Billion rows
DimSnapshot : 8 million
The seemingly innocuous query :
select f.* from factsamplevalue f join DimSnapshot t on t.snapshot_id = f.snapshot_id where smapiname_ver = 'dist_1';
Probably, a silly question to ask at this stage - does the infrastructure seem enough ?
The obvious Hive strategies + existing forum threads lead me to believe :
To check the cardinality and the skew , I executed the following queries on the vanilla/staging Hive table(as expected, it took ages 😛 )
select count(distinct Snapshot_Id) cardin_Snapshot_Id, count(distinct SmapiName_ver) cardin_SmapiName_ver, count(distinct SmapiColName) cardin_SmapiColName FROM FactSampleValue;
cardin_Snapshot_Id | cardin_SmapiName_ver | cardin_SmapiColName |
7967285 | 2717 | 45207 |
If SmapiName_ver column is used, it will create 2717 partitions which I am unsure is a sane no. of partitions, however, the skew considerations below may help to reduce the partitions in some way(list bucketing or ??). I need to use dynamic partitioning.
I also suspected a skew in the SmapiName_ver column, hence, executed the following query :
select SmapiName_ver, count(1) SmapiName_ver_distrib from FactSampleValue group by SmapiName_ver;
I have not provided the actual string values in SmapiName_ver but attached is the distribution in descending order
The smaller table(8 million) is having an integer snapshot id as PK in the source DB, hence, I guess I can use buckets to split the snapshot ids in some uniform manner but how many buckets ?
Unsure how to use for the bigger table. I guess based on the 'skew' information, can list bucketing be used ? If yes, how ?
I have decided to proceed with the ORC format but given the massive size of the dataset, shall I go with default compression(I guess it will be zlib will be used by default) settings or ??
Not considering this at the moment - is it a sensible decision ?
I had a set of queries based on my limited knowledge of configuration :
Cartesian products are not executed in parallel, and they are not optimized in any way using MapReduce
Should I consider this in the given simple query? If yes, what I need to do ?
I am currently not focusing on the cluster tweaks like 'Have set the task memory settings correctly so you fully utilize the cluster during queries' suggested by the community. I will first fix the Hive side
Created 05-23-2016 10:36 AM
Probably, a silly question to ask at this stage - does the infrastructure seem enough ?
Hard to say. I had a not too unsimilar join, 25 ( more powerful than yours ) nodes, 170b rows in one table, 7m rows in the other. a Join on key, some filters and aggregations etc. Normally my queries were heavily filtered ( on a single customer ). This ran in seconds ( however all heavily partitioned ). However I did a full join to create aggregation tables and these recreated everything in an hour. ( but that is much more work since it recreates all ORCs )
So it will most likely work the question is how restrictive is your where condition and what are your performance requirements.
Format and compression
Stay with Zlib, Its normally better. ORC is heavily optimized on it. I did a test with 170b rows joined to 50m rows once, and ORC/zlib in general was the best one ( as fast as snappy and much smaller )
Indexes
No, Rather sort the data properly before inserting it ( by your where condition ) ORC have build in indexes so try sorting by smapiname_ver for example. ( And load data in the tables with a good set of reducers so you get a couple of files all sorted by the where condition.
Bucketing(normal or 'List Bucketing')
You can try that but with 8m rows? It depends. If you can restrict the columns you may be able to do a full mapside join which would be best by far. Let's say 3-4 columns you would end up with 800MB which would easily fit into memory. 100 columns? would not work. In general I would first try it without it, the Hive optimizer is pretty good ( use the ANALYZE keyword and ANALYZE COLUMNS )
Misc. params
Not to tout my own horn but I think my presentation has the most important pieces in there.
http://www.slideshare.net/BenjaminLeonhardi/hive-loading-data
Should I consider this in the given simple query? If yes, what I need to do ?
Cartesian joins ( without the join condition ) are the devil and should be avoided at all cost. But you don't have one you have a join condition.
The tables should be well sorted - does this mean that while inserting data into final tables, I need to use some 'order by/sort by' clause
I think my presentation covers it well. Have a look through it and I am glad to answer any questions you might come up with.
Created 05-23-2016 12:55 PM
"Isn't it's role limited to the data loading(ORC table creation) phase ?"
Oh one last comment here. Loading the data correctly is key to the performance of the queries. One thing to look out for is too many small files in the table location. That is deadly for performance. So correct loading is a major thing.
Created 05-30-2016 08:57 AM
As mentioned in the original query :
DimSnapshot table, thus it's count = no. of records in DimSnapshot which is around 8 million
I did the following :
CREATE TABLE factsamplevalue_snapshot AS SELECT snapshot_id, COUNT(*) FROM factsamplevalue GROUP BY snapshot_id;
which resulted into a table with 7914806 rows, sample data :
select * from factsamplevalue_snapshot limit 10; OK factsamplevalue_snapshot.snapshot_id factsamplevalue_snapshot._c1 643438 2170 643445 2023 643924 3646 644063 2448 644153 2837 644459 848 644460 3713 644541 2080 645243 725 645599 852
Unfortunately, the histogram will return huge no. of entries, so cannot paste or provide the output.
Created 06-09-2016 10:34 AM
Can you give me top 50 min, max and the average. Also did you try the query ? What was the behaviour ? The reason I am asking that if your query is very long using a few number of reducer for example it may imply the skew and so to maximize usage of the cluster one way is too look at surrogate key creation.