Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

bucket map join

bucket map join

New Contributor

I have to join two large tables, so i am trying to use sort merge bucket map join. Both the tables are having partition on client id. There are 50 clients and I have bucketed both the tables in 20 buckets in another id field, which will be used in join.

When I execute the SMB join, I see 50*20= 1000 mappers. Please advise, how to correct that.

9 REPLIES 9

Re: bucket map join

@sandeep agarwal

Make sure that you run the stats on both tables. I believe you want to reduce mappers when you mentioned "how to correct" Is that right?

Re: bucket map join

New Contributor

Neeraj, I am not looking at reducers currently. I am worried about 1000 mappers. Does bucket map join works this way only?

I mean: if there are 50 partitions and I create 20 buckets. Will it always create 1000 mappers. As my both the tables in join are big, I will have to use bucket map join only otherwise it is creating common join.

I had run stats already on both tables.

Re: bucket map join

New Contributor

Anybody worked on bucket map join with partition?

Re: bucket map join

Guru

How large is your dataset? The number of mappers is based on your Input Splits, not the number of buckets. If you have a large amount of data, then 1 bucket may require multiple mappers.

Based on your question though I wonder if your buckets might not have been created properly. It sounds like you have a bucket per client id. Are you able to share any of the CREATE TABLE code?

Re: bucket map join

Rising Star

Are you stuck on MR?

With partitioned tables, SMB join is not the fastest JOIN once you get to Tez, the multi-file merge across partitions results in a lot of wasted CPU and actually limits the speed by losing SIMD (because rows need to be merged, not row-groups of 1024).

The Tez implementation of bucket-map-join dynamically partitioned hash-join was written as a specific way out of this problem for the small-scale data (<999 Gb). The dynamic routing of the hash-join allows you to split a big-table bucket and the unordered reads allow for split-grouping within a bucket.

http://www.slideshare.net/Hadoop_Summit/w-235phall...

Re: bucket map join

New Contributor

Hi Gopal, I need to join two big tables and then again join with another big table. I bucketed all 3 big tables and did not partition them and I was able to get good performance. Can you advise, how to do this using partition tables on Tez. Can you provide some examples.

Document shared by you is really good, but I need to know this in detail. Can you provide some detailed documentation on Tez on join.

Re: bucket map join

New Contributor

Hi, Sorry for being late on this. Yes, I am on MR only. If i run simple sql. it creates common join. I see only 25 mappers only as it creates 25 splits. while if i give sort merge bucket conditions like below. I see 1000 mappers (number of splits:1000).

set hive.auto.convert.sortmerge.join=true; 
set hive.optimize.bucketmapjoin=true;
set hive.optimize.bucketmapjoin.sortedmerge=true;
set hive.auto.convert.sortmerge.join.noconditionaltask=true;
set hive.enforce.bucketing=true;
set hive.enforce.sorting=true; 

I have also checked explain plan, it says "Sorted Merge Bucket Map Join Operator".

Here are steps: As both the tables were having join on multiple columns. I concatenated those columns into one column keycol and created bucket/join key on that column.

CREATE TABLE employee(  
keycol string,        
  src_plfm_id int,                                      
  emp_id int,                                    
  birth_day string,                            
  age int,                              
  addr1 string,                                  
  addr2 string, 
  addr3 string, 
  phn_no int,                                      
  load_dt string)                                      
PARTITIONED BY (                                          
  clnt_id string) 
clustered by (keycol) sorted by (keycol asc) into 20 buckets                                      
ROW FORMAT DELIMITED                                      
  FIELDS TERMINATED BY '\u0001'                          
  LINES TERMINATED BY '\n'                                
STORED AS RCFILE;
(it is textfile format table)
from employee_text
insert overwrite table employee partition (clnt_id)
select concat(src_plfm_id,load_dt,emp_id,birth_day),
  src_plfm_id,
  emp_id,                                    
  birth_day,                            
  age,                              
  addr1,                                  
  addr2, 
  addr3, 
  phn_no,                                      
  load_dt
sort by concat(src_plfm_id,load_dt,emp_id,birth_day);

CREATE TABLE employee_error(  
keycol string,        
  src_plfm_id int,                                      
  emp_id int,                                    
  birth_day string,                            
  error_done string,                                    
  load_dt string)                                      
PARTITIONED BY (                                          
  clnt_id string) 
clustered by (keycol) sorted by (keycol asc) into 20 buckets                                      
ROW FORMAT DELIMITED                                      
  FIELDS TERMINATED BY '\u0001'                          
  LINES TERMINATED BY '\n'                                
STORED AS RCFILE;
from employee_error_text
insert overwrite table employee partition (clnt_id)
select concat(src_plfm_id,load_dt,emp_id,birth_day),
  src_plfm_id,
  emp_id,                                    
  birth_day,                            
  error_done,                                    
  load_dt
sort by concat(src_plfm_id,load_dt,emp_id,birth_day);
create temporary table emp_error_temp
as select
  a.src_plfm_id,
  a.emp_id,                                    
  a.birth_day,                            
  a.age,                              
  a.addr1,                                  
  a.addr2, 
  a.addr3, 
  a.phn_no,                                      
  a.load_dt,
  b.error_done
    from employee a JOIN
         employee_error b
on (a.keycol = b.keycol); 

This query is creating 50*20=1000 buckets. why?

Re: bucket map join

Mentor

@sandeep agarwal can you accept the best answer to close this thread?

Highlighted

Re: bucket map join

New Contributor

You have 50 partitions and 20 buckets. Total num of files are 50*20=1000. when we are using bucket map join each bucket/file is processed individually even if it is less then the 64/128 MB. So it takes 1000 mappers definitely.