I have to join two large tables, so i am trying to use sort merge bucket map join. Both the tables are having partition on client id. There are 50 clients and I have bucketed both the tables in 20 buckets in another id field, which will be used in join.
When I execute the SMB join, I see 50*20= 1000 mappers. Please advise, how to correct that.
Neeraj, I am not looking at reducers currently. I am worried about 1000 mappers. Does bucket map join works this way only?
I mean: if there are 50 partitions and I create 20 buckets. Will it always create 1000 mappers. As my both the tables in join are big, I will have to use bucket map join only otherwise it is creating common join.
I had run stats already on both tables.
How large is your dataset? The number of mappers is based on your Input Splits, not the number of buckets. If you have a large amount of data, then 1 bucket may require multiple mappers.
Based on your question though I wonder if your buckets might not have been created properly. It sounds like you have a bucket per client id. Are you able to share any of the CREATE TABLE code?
Are you stuck on MR?
With partitioned tables, SMB join is not the fastest JOIN once you get to Tez, the multi-file merge across partitions results in a lot of wasted CPU and actually limits the speed by losing SIMD (because rows need to be merged, not row-groups of 1024).
The Tez implementation of bucket-map-join dynamically partitioned hash-join was written as a specific way out of this problem for the small-scale data (<999 Gb). The dynamic routing of the hash-join allows you to split a big-table bucket and the unordered reads allow for split-grouping within a bucket.
Hi Gopal, I need to join two big tables and then again join with another big table. I bucketed all 3 big tables and did not partition them and I was able to get good performance. Can you advise, how to do this using partition tables on Tez. Can you provide some examples.
Document shared by you is really good, but I need to know this in detail. Can you provide some detailed documentation on Tez on join.
Hi, Sorry for being late on this. Yes, I am on MR only. If i run simple sql. it creates common join. I see only 25 mappers only as it creates 25 splits. while if i give sort merge bucket conditions like below. I see 1000 mappers (number of splits:1000).
set hive.auto.convert.sortmerge.join=true; set hive.optimize.bucketmapjoin=true; set hive.optimize.bucketmapjoin.sortedmerge=true; set hive.auto.convert.sortmerge.join.noconditionaltask=true; set hive.enforce.bucketing=true; set hive.enforce.sorting=true;
I have also checked explain plan, it says "Sorted Merge Bucket Map Join Operator".
Here are steps: As both the tables were having join on multiple columns. I concatenated those columns into one column keycol and created bucket/join key on that column.
CREATE TABLE employee( keycol string, src_plfm_id int, emp_id int, birth_day string, age int, addr1 string, addr2 string, addr3 string, phn_no int, load_dt string) PARTITIONED BY ( clnt_id string) clustered by (keycol) sorted by (keycol asc) into 20 buckets ROW FORMAT DELIMITED FIELDS TERMINATED BY '\u0001' LINES TERMINATED BY '\n' STORED AS RCFILE; (it is textfile format table) from employee_text insert overwrite table employee partition (clnt_id) select concat(src_plfm_id,load_dt,emp_id,birth_day), src_plfm_id, emp_id, birth_day, age, addr1, addr2, addr3, phn_no, load_dt sort by concat(src_plfm_id,load_dt,emp_id,birth_day); CREATE TABLE employee_error( keycol string, src_plfm_id int, emp_id int, birth_day string, error_done string, load_dt string) PARTITIONED BY ( clnt_id string) clustered by (keycol) sorted by (keycol asc) into 20 buckets ROW FORMAT DELIMITED FIELDS TERMINATED BY '\u0001' LINES TERMINATED BY '\n' STORED AS RCFILE; from employee_error_text insert overwrite table employee partition (clnt_id) select concat(src_plfm_id,load_dt,emp_id,birth_day), src_plfm_id, emp_id, birth_day, error_done, load_dt sort by concat(src_plfm_id,load_dt,emp_id,birth_day); create temporary table emp_error_temp as select a.src_plfm_id, a.emp_id, a.birth_day, a.age, a.addr1, a.addr2, a.addr3, a.phn_no, a.load_dt, b.error_done from employee a JOIN employee_error b on (a.keycol = b.keycol);
This query is creating 50*20=1000 buckets. why?
You have 50 partitions and 20 buckets. Total num of files are 50*20=1000. when we are using bucket map join each bucket/file is processed individually even if it is less then the 64/128 MB. So it takes 1000 mappers definitely.