Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Part-1 : Join involving 24 billion X 1 to 8 million rows table

avatar
Super Collaborator

Stack : Installed HDP-2.3.2.0-2950 using Ambari 2.1

Nodes : 1 NN(8 X 1TB hdd, 16 X 2.53 GHz core processor,48GB RAM, RHEL 6.5) + 8 DN(8 X 600GB hdd, 16 X 2.53 GHz core processor, 75GB RAM, RHEL 6.5). Nodes are connected by a 10-gig network

Tables information :

FactSampleValue : 24 Billion rows

DimSnapshot : 8 million

  • Currently, I have imported these tables in text format onto HDFS and have created plain/staging Hive external tables
  • Once the final table strategy is decided, I will create another set of FINAL Hive external tables and populate them with insert into FINAL.table select * from staging.table

The seemingly innocuous query :

select f.* from factsamplevalue f join DimSnapshot t on t.snapshot_id = f.snapshot_id
where smapiname_ver = 'dist_1';

Probably, a silly question to ask at this stage - does the infrastructure seem enough ?

The obvious Hive strategies + existing forum threads lead me to believe :

  • Partitioning + Skew

To check the cardinality and the skew , I executed the following queries on the vanilla/staging Hive table(as expected, it took ages 😛 )

select count(distinct Snapshot_Id) cardin_Snapshot_Id, count(distinct SmapiName_ver) cardin_SmapiName_ver, count(distinct SmapiColName) cardin_SmapiColName FROM FactSampleValue; 
cardin_Snapshot_Idcardin_SmapiName_vercardin_SmapiColName
7967285271745207

If SmapiName_ver column is used, it will create 2717 partitions which I am unsure is a sane no. of partitions, however, the skew considerations below may help to reduce the partitions in some way(list bucketing or ??). I need to use dynamic partitioning.

I also suspected a skew in the SmapiName_ver column, hence, executed the following query :

select SmapiName_ver, count(1) SmapiName_ver_distrib from FactSampleValue group by SmapiName_ver;

I have not provided the actual string values in SmapiName_ver but attached is the distribution in descending order

skew-smapiname-ver-fsv.txt

  • Bucketing(normal or 'List Bucketing')

The smaller table(8 million) is having an integer snapshot id as PK in the source DB, hence, I guess I can use buckets to split the snapshot ids in some uniform manner but how many buckets ?

Unsure how to use for the bigger table. I guess based on the 'skew' information, can list bucketing be used ? If yes, how ?

  • Format and compression

I have decided to proceed with the ORC format but given the massive size of the dataset, shall I go with default compression(I guess it will be zlib will be used by default) settings or ??

  • Indexes

Not considering this at the moment - is it a sensible decision ?

  • Misc. params

I had a set of queries based on my limited knowledge of configuration :

  1. If I use bucketing on any of the tables, during data load in the ORC tables, I need to specify no. reducers = no. of buckets. Is this true even during execution of Hive queries i.e can I simply rely on Tez without having to wonder what reducer count should be set
  2. From the existing threads, I got several inputs like :
    1. The tables should be well sorted - does this mean that while inserting data into final tables, I need to use some 'order by/sort by' clause
  3. While flipping through the Hive doc., I came across the following statement :
Cartesian products are not executed
in parallel, and they are not optimized in any way using MapReduce

Should I consider this in the given simple query? If yes, what I need to do ?

I am currently not focusing on the cluster tweaks like 'Have set the task memory settings correctly so you fully utilize the cluster during queries' suggested by the community. I will first fix the Hive side

1 ACCEPTED SOLUTION

avatar
Master Guru

Probably, a silly question to ask at this stage - does the infrastructure seem enough ?

Hard to say. I had a not too unsimilar join, 25 ( more powerful than yours ) nodes, 170b rows in one table, 7m rows in the other. a Join on key, some filters and aggregations etc. Normally my queries were heavily filtered ( on a single customer ). This ran in seconds ( however all heavily partitioned ). However I did a full join to create aggregation tables and these recreated everything in an hour. ( but that is much more work since it recreates all ORCs )

So it will most likely work the question is how restrictive is your where condition and what are your performance requirements.

Format and compression

Stay with Zlib, Its normally better. ORC is heavily optimized on it. I did a test with 170b rows joined to 50m rows once, and ORC/zlib in general was the best one ( as fast as snappy and much smaller )

Indexes

No, Rather sort the data properly before inserting it ( by your where condition ) ORC have build in indexes so try sorting by smapiname_ver for example. ( And load data in the tables with a good set of reducers so you get a couple of files all sorted by the where condition.

Bucketing(normal or 'List Bucketing')

You can try that but with 8m rows? It depends. If you can restrict the columns you may be able to do a full mapside join which would be best by far. Let's say 3-4 columns you would end up with 800MB which would easily fit into memory. 100 columns? would not work. In general I would first try it without it, the Hive optimizer is pretty good ( use the ANALYZE keyword and ANALYZE COLUMNS )

Misc. params

Not to tout my own horn but I think my presentation has the most important pieces in there.

http://www.slideshare.net/BenjaminLeonhardi/hive-loading-data

Should I consider this in the given simple query? If yes, what I need to do ?

Cartesian joins ( without the join condition ) are the devil and should be avoided at all cost. But you don't have one you have a join condition.

The tables should be well sorted - does this mean that while inserting data into final tables, I need to use some 'order by/sort by' clause

I think my presentation covers it well. Have a look through it and I am glad to answer any questions you might come up with.

View solution in original post

12 REPLIES 12

avatar
Rising Star

Hello,

I can help you to make this query works on your infrastructure.

Can you give me the result of the number of element per key (for t.snapshot_id = f.snapshot_id), ie : how many time I will find the same key on both table.

Not sure Indexes will help here first.

There are several ways to optimize Hive query (like creating surrogate key) but we will check all option / parameter going forward.

Kind regards.

avatar
Super Collaborator

The source schema is a 'STAR schema' and the larger table is actually a fact table, hence, all the t.snapshot_id will be present in the f.snapshot_id column

avatar
Rising Star

I understand but I would like to know if there is a data skew, telling me the max and the min, avg, perhaps a histogram will help me

avatar
Super Collaborator

In the bigger table, yes, there is a skew, please check the original question, I have also uploaded the column wise(SmapiName_ver)count of rows. As for the snapshot_id, it is a linear, incremental int field.

Which(columns) max and min values would assist you ?

avatar
Rising Star

plz run that

SELECT snapshot_id, COUNT(*) FROM factsamplevalue GROUP BY snapshot_id

SELECT snapshot_id, COUNT(*) FROM DimSnapshot GROUP BY snapshot_id

and if you can get an histogram, thanks

avatar
Master Guru

Probably, a silly question to ask at this stage - does the infrastructure seem enough ?

Hard to say. I had a not too unsimilar join, 25 ( more powerful than yours ) nodes, 170b rows in one table, 7m rows in the other. a Join on key, some filters and aggregations etc. Normally my queries were heavily filtered ( on a single customer ). This ran in seconds ( however all heavily partitioned ). However I did a full join to create aggregation tables and these recreated everything in an hour. ( but that is much more work since it recreates all ORCs )

So it will most likely work the question is how restrictive is your where condition and what are your performance requirements.

Format and compression

Stay with Zlib, Its normally better. ORC is heavily optimized on it. I did a test with 170b rows joined to 50m rows once, and ORC/zlib in general was the best one ( as fast as snappy and much smaller )

Indexes

No, Rather sort the data properly before inserting it ( by your where condition ) ORC have build in indexes so try sorting by smapiname_ver for example. ( And load data in the tables with a good set of reducers so you get a couple of files all sorted by the where condition.

Bucketing(normal or 'List Bucketing')

You can try that but with 8m rows? It depends. If you can restrict the columns you may be able to do a full mapside join which would be best by far. Let's say 3-4 columns you would end up with 800MB which would easily fit into memory. 100 columns? would not work. In general I would first try it without it, the Hive optimizer is pretty good ( use the ANALYZE keyword and ANALYZE COLUMNS )

Misc. params

Not to tout my own horn but I think my presentation has the most important pieces in there.

http://www.slideshare.net/BenjaminLeonhardi/hive-loading-data

Should I consider this in the given simple query? If yes, what I need to do ?

Cartesian joins ( without the join condition ) are the devil and should be avoided at all cost. But you don't have one you have a join condition.

The tables should be well sorted - does this mean that while inserting data into final tables, I need to use some 'order by/sort by' clause

I think my presentation covers it well. Have a look through it and I am glad to answer any questions you might come up with.

avatar
Super Collaborator
  • I read the article and now I have some grip on how to proceed, however, some questions too :
  • How to I begin with the partitioning - there are 2719 distinct values for the column SmapiName_ver ( Can you check the attached file which has the distribution of the smapiname_ver ? )
  • Regarding use of partitions, there was a point 'No range partitioning/No continuous values'. In my case, the column smapiname_ver is a varchar column. How does it impact ?
  • Given the 'INSERT INTO ORC_TABLE SELECT * FROM STAGING_TABLE', I didn't get what the 'Loading data using distribution' section
  • Couldn't find the optimize.sort.dynamic.partitioning setting, is it same as :

4439-dynamic-partitioning.png

  • Is 'DISTRIBUTE BY' clause relevant(even in my example) when it comes to querying the large tables ? Isn't it's role limited to the data loading(ORC table creation) phase ?If not, how do I use it ?
  • 'For PPD, use SORT BY' meaning we don't enforce a total order, also, this means a field like date or some integer. In my case, I guess I need to use the snapshot_id (latest entry will have the highest snapshot_id). Correct me if I am mistaken.

avatar
Master Guru

Partitioning:

Hive doesn't have range partitioning, 2719 values is not too much ( a couple thousand partitions works fine ) However you might have bad distribution of values ( does each smapiname has similar number rows? )

If the above doesn't work for you you can do dirty tricks: i.e. hashing the smapiname into a set of partitions.

partiton by ( hash(smapiname ) % 64 ) for example. However in this case you need to add a where condition to your queries doing this computation again.

Given the 'INSERT INTO ORC_TABLE SELECT * FROM STAGING_TABLE', I didn't get what the 'Loading data using distribution' section

Please be more specific

Couldn't find the optimize.sort.dynamic.partitioning setting, is it same as :

Its neither I would not set it globally ( since this affects all loads. ) rather do it in the load script with a set param=value;

Is 'DISTRIBUTE BY' clause relevant(even in my example) when it comes to querying the large tables ? Isn't it's role limited to the data loading(ORC table creation) phase ?If not, how do I use it ?

It is relevant to loading because it defines number of files ( very much like bucketing ). I used it because bucketing for me was buggy and it gave me more control over the ingestion.

'For PPD, use SORT BY' meaning we don't enforce a total order, also, this means a field like date or some integer. In my case, I guess I need to use the snapshot_id (latest entry will have the highest snapshot_id). Correct me if I am mistaken.

The good thing in sort by is that it sorts each file( or bucket ) i.e. each reducer output. This is all you need for performance since predicate pushdown only cares about sorting in one file. Try to visualize how data is structured in each file and see how many tasks will run in the end and how many blocks of data they can skip because of the sorting. It helps to write it down.

So how would I do it in your case? Try to partition by the smapiname or if you have to do it by a hash of it. If you partition by it directly sorting doesnt help you much because you dont have a second where condition. If you partiton by a hash then sort by the name itself to at least skip some values. Make sure to use distribute by OR bucketing to create a couple of files ( 1-10 blocks is a good size ) and distribute bucket by something OTHER than the smapiname. ( in this case you get parallel execution ) OR by the smapiname in this case a single query will be slower but multiple queries in parallel would be faster. Tradeoff again.

avatar
Super Collaborator

'partitioned by ( hash(SmapiName_ver) % 64 ) ' and then then sort by the name itself to at least skip some values

This is a good idea, there are only two facts that worry me :

  • The SmapiName_ver column is varchar, I am unsure how evenly/oddly will the SmapiName_ver keys be hashed into sets ! Is there a way in Hive to figure out what hash a key will go if you have the distinct key values with you ?
  • As mentioned in my original question, the SmapiName_ver is having a heavy skew(file skew-smapiname-ver-fsv.txt has the desc order row count for each key). For example 'dist_1' key is present in 2 billion rows, 'ct_1' in 98 million, 'tm_1' in 8 million and so on. Now if 'dist_1' and 'ct_1' land in the same hash/partition, then instead of scanning 2billion in case of partitioned by SmapiName_ver, it will be now 2 billion + 98 million rows to scan - please correct if my understanding is incorrect

Make sure to use distribute by OR bucketing to create a couple of files ( 1-10 blocks is a good size ) and distribute bucket by something OTHER than the smapiname

Which column to use for distribute by is a question as :

  • Snapshot_Id(int) is having a high cardinality(it is the PK in the other smaller table) i.e 8 million distinct values. This is THE JOIN COLUMN, does it make sense to use distribute by on this ?
  • Is it a mandate to have DISTRIBUTE BY on a column which will be used in the query(either WHERE or JOIN) ?

If you partition by it directly sorting doesnt help you much because you dont have a second where condition

I didn't quite understand this statement

SORT BY :

  • How exactly SORT BY helps to skip records
  • Does SORT BY sorts data within each file created by DISTRIBUTED BY or sorts data within EACH PARTITION
  • Given the query(select f.* from factsamplevalue f join DimSnapshot t on t.snapshot_id = f.snapshot_id where smapiname_ver = 'dist_1'), is it necessary that the SORT BY should be either snapshot_id or smapiname_ver