Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Part-1 : Join involving 24 billion X 1 to 8 million rows table

avatar
Super Collaborator

Stack : Installed HDP-2.3.2.0-2950 using Ambari 2.1

Nodes : 1 NN(8 X 1TB hdd, 16 X 2.53 GHz core processor,48GB RAM, RHEL 6.5) + 8 DN(8 X 600GB hdd, 16 X 2.53 GHz core processor, 75GB RAM, RHEL 6.5). Nodes are connected by a 10-gig network

Tables information :

FactSampleValue : 24 Billion rows

DimSnapshot : 8 million

  • Currently, I have imported these tables in text format onto HDFS and have created plain/staging Hive external tables
  • Once the final table strategy is decided, I will create another set of FINAL Hive external tables and populate them with insert into FINAL.table select * from staging.table

The seemingly innocuous query :

select f.* from factsamplevalue f join DimSnapshot t on t.snapshot_id = f.snapshot_id
where smapiname_ver = 'dist_1';

Probably, a silly question to ask at this stage - does the infrastructure seem enough ?

The obvious Hive strategies + existing forum threads lead me to believe :

  • Partitioning + Skew

To check the cardinality and the skew , I executed the following queries on the vanilla/staging Hive table(as expected, it took ages 😛 )

select count(distinct Snapshot_Id) cardin_Snapshot_Id, count(distinct SmapiName_ver) cardin_SmapiName_ver, count(distinct SmapiColName) cardin_SmapiColName FROM FactSampleValue; 
cardin_Snapshot_Idcardin_SmapiName_vercardin_SmapiColName
7967285271745207

If SmapiName_ver column is used, it will create 2717 partitions which I am unsure is a sane no. of partitions, however, the skew considerations below may help to reduce the partitions in some way(list bucketing or ??). I need to use dynamic partitioning.

I also suspected a skew in the SmapiName_ver column, hence, executed the following query :

select SmapiName_ver, count(1) SmapiName_ver_distrib from FactSampleValue group by SmapiName_ver;

I have not provided the actual string values in SmapiName_ver but attached is the distribution in descending order

skew-smapiname-ver-fsv.txt

  • Bucketing(normal or 'List Bucketing')

The smaller table(8 million) is having an integer snapshot id as PK in the source DB, hence, I guess I can use buckets to split the snapshot ids in some uniform manner but how many buckets ?

Unsure how to use for the bigger table. I guess based on the 'skew' information, can list bucketing be used ? If yes, how ?

  • Format and compression

I have decided to proceed with the ORC format but given the massive size of the dataset, shall I go with default compression(I guess it will be zlib will be used by default) settings or ??

  • Indexes

Not considering this at the moment - is it a sensible decision ?

  • Misc. params

I had a set of queries based on my limited knowledge of configuration :

  1. If I use bucketing on any of the tables, during data load in the ORC tables, I need to specify no. reducers = no. of buckets. Is this true even during execution of Hive queries i.e can I simply rely on Tez without having to wonder what reducer count should be set
  2. From the existing threads, I got several inputs like :
    1. The tables should be well sorted - does this mean that while inserting data into final tables, I need to use some 'order by/sort by' clause
  3. While flipping through the Hive doc., I came across the following statement :
Cartesian products are not executed
in parallel, and they are not optimized in any way using MapReduce

Should I consider this in the given simple query? If yes, what I need to do ?

I am currently not focusing on the cluster tweaks like 'Have set the task memory settings correctly so you fully utilize the cluster during queries' suggested by the community. I will first fix the Hive side

1 ACCEPTED SOLUTION

avatar
Master Guru
hide-solution

This problem has been solved!

Want to get a detailed solution you have to login/registered on the community

Register/Login
12 REPLIES 12

avatar
Master Guru

"Isn't it's role limited to the data loading(ORC table creation) phase ?"

Oh one last comment here. Loading the data correctly is key to the performance of the queries. One thing to look out for is too many small files in the table location. That is deadly for performance. So correct loading is a major thing.

avatar
Super Collaborator

@mlanciaux

As mentioned in the original query :

  1. DimSnapshot.snapshot_id is the PK of the

    DimSnapshot table, thus it's count = no. of records in DimSnapshot which is around 8 million

  2. I did the following :

CREATE TABLE factsamplevalue_snapshot AS SELECT snapshot_id, COUNT(*) FROM factsamplevalue GROUP BY snapshot_id;

which resulted into a table with 7914806 rows, sample data :

select * from factsamplevalue_snapshot limit 10;
OK
factsamplevalue_snapshot.snapshot_id    factsamplevalue_snapshot._c1
643438  2170
643445  2023
643924  3646
644063  2448
644153  2837
644459  848
644460  3713
644541  2080
645243  725
645599  852

Unfortunately, the histogram will return huge no. of entries, so cannot paste or provide the output.

avatar
Rising Star

Can you give me top 50 min, max and the average. Also did you try the query ? What was the behaviour ? The reason I am asking that if your query is very long using a few number of reducer for example it may imply the skew and so to maximize usage of the cluster one way is too look at surrogate key creation.