Support Questions

Find answers, ask questions, and share your expertise

How do you force the number of reducers in a map reduce job to be higher?

avatar

I'm attempting to copy 30 billion rows from one hive table into another hive table. The tables are both created the same and are partitioned on date (DT). Currently there are 1173 partitions. I'm using the following query: insert into accesslog_new PARTITION (DT) select * from accesslog;

This query has been running for almost 3 days straight on a cluster with 18 data nodes.

My issue is that the Map-Reduce job only creates one reducer step. Btw, we are using MR2. I'm guessing this is drastically slowing things down. Is there a way to force the number of reducers to be much larger? How do you also figure out what an appropriate number of reducers would be for that volume of data?

11 REPLIES 11

avatar
Guru

you can force reducers by setting

SET mapreduce.job.reduces=XX

avatar
Master Guru

@Kevin Sievers - According to official apache document

By default number of reducers is set to 1, you can change/overwrite it according to answer given by Laurent above.How Many Reduces?

The right number of reduces seems to be 0.95 or 1.75 multiplied by (<no. of nodes> * <no. of maximum containers per node>).

With 0.95 all of the reduces can launch immediately and start transferring map outputs as the maps finish. With 1.75 the faster nodes will finish their first round of reduces and launch a second wave of reduces doing a much better job of load balancing.

Increasing the number of reduces increases the framework overhead, but increases load balancing and lowers the cost of failures.

The scaling factors above are slightly less than whole numbers to reserve a few reduce slots in the framework for speculative-tasks and failed tasks.

avatar
Expert Contributor

for old api set mapred.reduce.tasks=N

for new api set mapreduce.job.reduces=N

avatar
Master Guru

Perhaps one thing in advance. Are you sure sure that there are reducers? Normally I would assume your job would just run with a LOT of mappers and no reducers at all. Especially since he goes from partitioned table to partitioned table this should be ok. ( It would be terrible from a randomly ordered input data set to a partitioned table but that's a different story. ) An explain of your query would be nice. ( i.e. run EXPLAIN INSERT INTO ... and see why he does the reducer )

How old is your hive?

Initial loads in partitioned tables are always tricky, here is my shameless plug presentation:

http://www.slideshare.net/BenjaminLeonhardi/hive-loading-data

If you set number of reducers

SET MAPRED.REDUCE.TASKS = x;

You should ALSO set the distribution of it otherwise one reducer will try to write to 1173 partitions which will kill it.

INSERT .... SELECT * FROM xxx DISTRIBUTE BY DT;

Regarding number of reducers? Check how much memory you need for each task ( for ORC I like 8GB ) and divide your space by it. Or choose 1173 if that fails. He might run them one after another but each would only write to one partition.

other ways to fix it is to use

set optimize.sort.dynamic.partitioning=true;

Which should theoretically do everything for you but which is not as flexible.

Also have a look into the presentation to do correct sorting and please enable Tez :-).

avatar

I ran the following yesterday afternoon and it took about the same time as the original copy. Here's the results which are about the same as the original test which did not set the mapred.reduce.tasks and without the distribute clause. 5.5 hours to copy the table with about 750 million rows in our test system. Are my expectations off base that this should be something Hadoop can do much faster?

SET MAPRED.REDUCE.TASKS=10;

insert into accesslog.accesslog_new_test PARTITION (DT) select * from accesslog.accesslog DISTRIBUTE BY DT;

Status: Running (Executing on YARN cluster with App id application_1460584845937_0014)

--------------------------------------------------------------------------------

VERTICES STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED

--------------------------------------------------------------------------------

Map 1 .......... SUCCEEDED 1 1 0 0 0 0

Reducer 2 ...... SUCCEEDED 356 356 0 0 0 0

Reducer 3 ...... SUCCEEDED 1 1 0 0 0 0

--------------------------------------------------------------------------------

VERTICES: 03/03 [==========================>>] 100% ELAPSED TIME: 19286.39 s

--------------------------------------------------------------------------------

Loading data to table accesslog.accesslog_new_test partition (dt=null)

Time taken for load dynamic partitions : 15703

Loading partition {dt=140125}

Loading partition {dt=140124}

Loading partition {dt=140202}

Loading partition {dt=130210}

...

Loading partition {dt=140123}

Loading partition {dt=160214}

Loading partition {dt=140105}

Time taken for adding to write entity : 27

Partition accesslog.accesslog_new_test{dt=130129} stats: [numFiles=1, numRows=2686851, totalSize=46373620, rawDataSize=0]

Partition accesslog.accesslog_new_test{dt=130130} stats: [numFiles=1, numRows=14329847, totalSize=240074232, rawDataSize=0]

Partition accesslog.accesslog_new_test{dt=130131} stats: [numFiles=1, numRows=14059931, totalSize=235402253, rawDataSize=0]

Partition accesslog.accesslog_new_test{dt=130201} stats: [numFiles=1, numRows=12976777, totalSize=223349207, rawDataSize=0]

Partition accesslog.accesslog_new_test{dt=130202} stats: [numFiles=1, numRows=4335922, totalSize=71477106, rawDataSize=0]

...

Partition accesslog.accesslog_new_test{dt=160229} stats: [numFiles=1, numRows=11316976, totalSize=217067108, rawDataSize=0]

Partition accesslog.accesslog_new_test{dt=160301} stats: [numFiles=1, numRows=8730842, totalSize=162107356, rawDataSize=0]

OK

Time taken: 19320.464 seconds

avatar

By the way, Here is the explain plan:

hive> SET MAPRED.REDUCE.TASKS=10;

hive> explain insert into accesslog.accesslog_new_test PARTITION (DT) select * from accesslog.accesslog DISTRIBUTE BY DT;

OK

Plan not optimized by CBO.

Vertex dependency in root stage

Reducer 2 <- Map 1 (SIMPLE_EDGE)

Reducer 3 <- Reducer 2 (SIMPLE_EDGE)

Stage-3

Stats-Aggr Operator

Stage-0

Move Operator

partition:{}

table:{"serde:":"org.apache.hadoop.hive.ql.io.orc.OrcSerde","name:":"accesslog.accesslog_new_test","input format:":"org.apache.hadoop.hive.ql.io.orc.OrcInputFormat","output format:":"org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"}

Stage-2

Dependency Collection{}

Stage-1

Reducer 3

File Output Operator [FS_6]

compressed:true

Statistics:Num rows: 759094090 Data size: 139673312560 Basic stats: COMPLETE Column stats: PARTIAL

table:{"serde:":"org.apache.hadoop.hive.ql.io.orc.OrcSerde","name:":"accesslog.accesslog_new_test","input format:":"org.apache.hadoop.hive.ql.io.orc.OrcInputFormat","output format:":"org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"}

Select Operator [SEL_5]

| outputColumnNames:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12"]

| Statistics:Num rows: 759094090 Data size: 139673312560 Basic stats: COMPLETE Column stats: PARTIAL

|<-Reducer 2 [SIMPLE_EDGE]

Reduce Output Operator [RS_4]

Map-reduce partition columns:_col1 (type: string)

sort order:

Statistics:Num rows: 759094090 Data size: 139673312560 Basic stats: COMPLETE Column stats: PARTIAL

value expressions:_col0 (type: string), _col1 (type: string), _col2 (type: string), _col3 (type: string), _col4 (type: string), _col5 (type: string), _col6 (type: string), _col7 (type: string), _col8 (type: string), _col9 (type: string), _col10 (type: string), _col11 (type: map<string,string>), _col12 (type: string)

Select Operator [SEL_3]

| outputColumnNames:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12"]

| Statistics:Num rows: 759094090 Data size: 139673312560 Basic stats: COMPLETE Column stats: PARTIAL

|<-Map 1 [SIMPLE_EDGE]

Reduce Output Operator [RS_2]

Map-reduce partition columns:_col12 (type: string)

sort order:

Statistics:Num rows: 759094090 Data size: 11901499527 Basic stats: COMPLETE Column stats: PARTIAL

value expressions:_col0 (type: string), _col1 (type: string), _col2 (type: string), _col3 (type: string), _col4 (type: string), _col5 (type: string), _col6 (type: string), _col7 (type: string), _col8 (type: string), _col9 (type: string), _col10 (type: string), _col11 (type: map<string,string>), _col12 (type: string)

Select Operator [SEL_1]

outputColumnNames:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12"]

Statistics:Num rows: 759094090 Data size: 11901499527 Basic stats: COMPLETE Column stats: PARTIAL

TableScan [TS_0]

alias:accesslog

Statistics:Num rows: 759094090 Data size: 11901499527 Basic stats: COMPLETE Column stats: PARTIAL

avatar
Master Guru

@Kevin Sievers

Hi Kevin,

your commands look good to me, somehow he does not take the number of reduce tasks though. You are right Hadoop should be MUCH faster. But the one reduce task and even weirder one mapper seem to be the problem

And I assure you it runs with a lot of mappers and 40 reducers and is loading and transforming around 300 GB of data in 20 minutes on an 7 datanode cluster.

So basically I have NO idea why he does only one mapper, I have no idea why he has the second Reducer AT ALL. I have no idea why he ignores the mapred.reduce.tasks parameter?

I think a support ticket might be in order.

set hive.tez.java.opts = "-Xmx3600m";
set hive.tez.container.size = 4096;
set mapred.reduce.tasks=120;
CREATE EXTERNAL TABLE  STAGING ...
...
insert into TABLE TARGET partition (day = 20150811) SELECT * FROM STAGING distribute by DT ;

avatar

Thanks for the responses guys. I'm still working through this on our test cluster with a smaller set of data. 750+ million rows. But so far I've not had any luck.

Apolologies for the formatting issues, but here's an example along with the explain plan. I wanted to give it enough time to hit the reducer step before I declared victory or defeat on the 'set mapreduce.job.reduces=10;' setting.

I'll try more options when I get the time.

hive> set mapreduce.job.reduces=10;

hive> explain insert into accesslog_new PARTITION (DT) select * from accesslog;

OK

Plan not optimized by CBO.

Vertex dependency in root stage Reducer 2 <- Map 1 (SIMPLE_EDGE)

Stage-3 Stats-Aggr Operator Stage-0 Move Operator partition:{} table:{"serde:":"org.apache.hadoop.hive.ql.io.orc.OrcSerde","name:":"accesslog.accesslog_new","input format:":"org.apache.hadoop.hive.ql.io.orc.OrcInputFormat","output format:":"org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"} Stage-2 Dependency Collection{} Stage-1 Reducer 2 File Output Operator [FS_4] compressed:true Statistics:Num rows: 759094090 Data size: 139673312560 Basic stats: COMPLETE Column stats: PARTIAL table:{"serde:":"org.apache.hadoop.hive.ql.io.orc.OrcSerde","name:":"accesslog.accesslog_new","input format:":"org.apache.hadoop.hive.ql.io.orc.OrcInputFormat","output format:":"org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"} Select Operator [SEL_3] | outputColumnNames:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12"] | Statistics:Num rows: 759094090 Data size: 139673312560 Basic stats: COMPLETE Column stats: PARTIAL |<-Map 1 [SIMPLE_EDGE] Reduce Output Operator [RS_2] Map-reduce partition columns:_col1 (type: string) sort order: Statistics:Num rows: 759094090 Data size: 11901499527 Basic stats: COMPLETE Column stats: PARTIAL value expressions:_col0 (type: string), _col1 (type: string), _col2 (type: string), _col3 (type: string), _col4 (type: string), _col5 (type: string), _col6 (type: string), _col7 (type: string), _col8 (type: string), _col9 (type: string), _col10 (type: string), _col11 (type: map<string,string>), _col12 (type: string) Select Operator [SEL_1] outputColumnNames:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12"] Statistics:Num rows: 759094090 Data size: 11901499527 Basic stats: COMPLETE Column stats: PARTIAL TableScan [TS_0] alias:accesslog Statistics:Num rows: 759094090 Data size: 11901499527 Basic stats: COMPLETE Column stats: PARTIAL

Time taken: 6.483 seconds, Fetched: 35 row(s) hive> insert into accesslog_new PARTITION (DT) select * from accesslog; Query ID = svchadoop_20160418123532_e3be4118-b866-4b88-ae52-42d1e574a9fe Total jobs = 1 Launching Job 1 out of 1 Tez session was closed. Reopening... Session re-established.

Status: Running (Executing on YARN cluster with App id application_1460584845937_0011) --------------------------------------------------------------------------------

VERTICES STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED -------------------------------------------------------------------------------- Map 1 .......... SUCCEEDED 1 1 0 0 0 0 Reducer 2 RUNNING 1 0 1 0 0 0 -------------------------------------------------------------------------------- VERTICES: 01/02 [=============>>-------------] 50% ELAPSED TIME: 5094.71 s --------------------------------------------------------------------------------

avatar
Master Guru

MAPRED.REDUCE.TASKS ( like I wrote )

or

mapreduce.job.reduceRs ( Laurent had a small error there )