Reply
New Contributor
Posts: 2
Registered: ‎02-10-2017

What is the exact Map Reduce WorkFlow?

Summary from the book "hadoop definitive guide - tom white" is:

 

All the logic between user's map function and user's reduce function is called shuffle. Shuffle then spans across both map and reduce. After user's map() function, the output is in in-memory circular buffer. When the buffer is 80% full, the background thread starts to run. The background thread will output the buffer's content into a spill file. This spill file is partitioned by key. And within each partition, the key-value pairs are sorted by key.After sorting, if combiner function is enabled, then combiner function is called. All spill files will be merged into one MapOutputFile. And all Map tasks's MapOutputFile will be collected over network to Reduce task. Reduce task will do another sort. And then user's Reduce function will be called.

 

So the questions are:

 

1.) According to the above summary, this is the flow:

 

Mapper--Partioner--Sort--Combiner--Shuffle--Sort--Reducer--Output

 

1a.) Is this the flow or is it something else?

 

1b.) Can u explain the above flow with an example say word count example, (the ones I found online weren't that elaborative) ?

 

2.) So the mappers phase output is one big file (MapOutputFile)? And it is this one big file that is broken into and

the key-value pairs are passed onto the respective reducers?

 

3.) Why does the sorting happens for a second time, when the data is already sorted & combined when passed onto their respective reducers?

 

4.) Say if mapper1 is run on Datanode1 then is it necessary for reducer1 to run on the datanode1? Or it can run on any Datanode?

Highlighted
Champion
Posts: 563
Registered: ‎05-16-2016

Re: What is the exact Map Reduce WorkFlow?

In a Nut shell 

 

Map - input splt - assing key valye pair
sort - sorts based on the key
shuffle - groups the element based on the key and feed all the key value pair of same key to reducer
reduce - basically sums or average function and push one finall key value pair to the destination

 

 

 

2  The framework gets to decide how to split the input dataset into chucks
the component that does this is input format .

Input format is responsible for forming a key value pair for each record.

3. For each input split a map task is assigned .that runs the user map function on the input split
Task are executed in parallel , that means each input splits are run in parallel in each machine that makes up the cluster

4. each key is assigned to a partion using a component called partionaire.
default hash partionaire - it takes the hash value of the key to the modulos of no of reducer gives the partion number.
for example INFO key will always be placed on the same partion.


5. Each key is processed by each reducer. so it first takes the copy of the intermediate ouput data from the each worker node .

6. intermediate data output needs to be processed by reducer using users reducer code.

7. Shuffel and sort phase is performed by the reduce task before it executes user defined reduce function .

8. Once it acquires the intermediate data it will perform merge sort for one sorted full list.
each reduer then feds the result to seperate file in hdfs or local file system.

 

 

Your  query 

 

Say if mapper1 is run on Datanode1 then is it necessary for reducer1 to run on the datanode1? Or it can run on any Datanode?

 

Mapreduce framework honors Data locality . 

if the data is local then it will run the reducer accordingly .

 

 

Why does the sorting happens for a second time, when the data is already sorted & combined when passed onto their respective reducers?

 

Merge sorting for finall output. Sometimes it best to perform the sorting in the local  reducer for performance instead of the final reducer. 

 

 

 

Posts: 642
Topics: 3
Kudos: 105
Solutions: 67
Registered: ‎08-16-2016

Re: What is the exact Map Reduce WorkFlow?

Let me take a stab at this.

1.) According to the above summary, this is the flow:

Mapper--Partioner--Sort--Combiner--Shuffle--Sort--Reducer--Output

1a.) Is this the flow or is it something else?

 

This is the correct flow although it is often abrievate to Map -> Shuffle/Sort -> Reduce.  So anything that isn't a mapper or reducer is part of the Shuffle/Sort phase.

 

"Shuffle then spans across both map and reduce"

This is just commenting on that the phase includes logic that physically runs in the mapper and reducer.

1b.) Can u explain the above flow with an example say word count example, (the ones I found online weren't that elaborative) ?

 

Example below.


2.) So the mappers phase output is one big file (MapOutputFile)? And it is this one big file that is broken into and
the key-value pairs are passed onto the respective reducers?

 

Yes it is one big file per mapper.  It isn't passed; the reducer will recieve a list of mappers that contains the partitions that it is responsible for.  The reducer will connect to the mapper and read the data that it needs.

 

3.) Why does the sorting happens for a second time, when the data is already sorted & combined when passed onto their respective reducers?

 

There is a map side sort and a reduce side sort.  Each occurs in the side it is on; the second being in the reducer.

4.) Say if mapper1 is run on Datanode1 then is it necessary for reducer1 to run on the datanode1? Or it can run on any Datanode?

 

Data locality only matters for the mappers.  The reducers will be ran on any know that has resources available to run it.  The reason being is that it will be streaming data in from one or more mappers so there is very little benefit in getting it to be local to one of them and it would make it more complex in scheduling them.

 

 

All the logic between user's map function and user's reduce function is called shuffle. Shuffle then spans across both map and reduce. After user's map() function, the output is in in-memory circular buffer. When the buffer is 80% full, the background thread starts to run. The background thread will output the buffer's content into a spill file. This spill file is partitioned by key. And within each partition, the key-value pairs are sorted by key.After sorting, if combiner function is enabled, then combiner function is called. All spill files will be merged into one MapOutputFile. And all Map tasks's MapOutputFile will be collected over network to Reduce task. Reduce task will do another sort. And then user's Reduce function will be called.

 

Lets run a MR wordcount job against this text block.  Arbitrarily, lets split  it into 4 mappers and have two reducers.

 

Map #1: All the logic between user's map function and user's reduce function is called shuffle. Shuffle then spans across both map and reduce. After user's map() function, the output is in in-memory circular buffer.

 

MapOutputFile pre-sort:

All, 1

the, 1

logic, 1

between, 1

user's, 1

map, 1

function, 1

and, 1

user's, 1

reduce, 1

function, 1

is, 1

called, 1

shuffle, 1

Shuffle, 1

then, 1

spans, 1

across, 1

both, 1

map, 1

and, 1

reduce, 1

After, 1

user's, 1

map(), 1

function, 1

the, 1

output, 1

is, 1

in, 1

in-memory, 1

circular, 1

 

MapOutputFile sorted and combined:

across, 1

After, 1

All, 1

and, 2

between, 1

both, 1

called, 1

circular, 1

function, 3

in, 1

in-memory, 1

is, 2

logic, 1

output, 1

map, 2

map(), 1
reduce, 2

shuffle, 1

Shuffle, 1

spans, 1

the, 3

then, 1
user's, 3

 

 

Map #2: When the buffer is 80% full, the background thread starts to run. The background thread will output the buffer's content into a spill file. This spill file is partitioned by key.

 

MapOutputFile:

80%, 1

a, 1

background, 2

buffer, 1

buffer's, 1

by, 1

content, 1

file, 2

full, 1

into, 1

is, 1

key, 1

output, 1

partitioned, 1

 

run, 1

spill, 2

starts, 1

the, 3

The, 1

This, 1

thread, 2

to, 1

When, 1

will, 1

 

Map #3: And within each partition, the key-value pairs are sorted by key.After sorting, if combiner function is enabled, then combiner function is called. All spill files will be merged into one MapOutputFile.

 

MapOutputFile:

After, 1

All, 1

And, 1

are, 1

be, 1

by, 1

called, 1

combiner, 2

each, 1

enabled, 1

files, 1

function, 2

if, 1

into, 1

is, 2

key, 1

key-value, 1

MapOutputFile, 1

merged, 1

one, 1

pairs, 1

partition, 1

sorted, 1

sorting, 1

spill, 1

the, 1

then, 1

will, 1

within, 1

 

Map #4: And all Map tasks's MapOutputFile will be collected over network to Reduce task. Reduce task will do another sort. And then user's Reduce function will be called.

 

MapOutputFile:

all, 1

And, 2

another, 1

be. 2

called, 1

collected, 1

do, 1

function, 1

Map, 1

MapOutputFile, 1

network, 1

over, 1

Reduce, 3

sort, 1

task, 2

tasks's, 1

then, 1

to, 1

user's, 1

will, 3

 

Reduce #1 (keys 0 - L):

 

Pre-sort and merge:

 

across, 1

After, 1

All, 1

and, 2

between, 1

both, 1

called, 1

circular, 1

function, 3

in, 1

in-memory, 1

is, 2

logic, 1

0%, 1

a, 1

background, 2

buffer, 1

buffer's, 1

by, 1

content, 1

file, 2

full, 1

into, 1

is, 1

key, 1

After, 1

All, 1

And, 1

are, 1

be, 1

by, 1

called, 1

combiner, 2

each, 1

enabled, 1

files, 1

function, 2

if, 1

into, 1

is, 2

key, 1

key-value, 1

all, 1

And, 2

another, 1

be. 2

called, 1

collected, 1

do, 1

function, 1

 

Post sort and merge:

0%, 1
a, 1
across, 1
After, 2
all, 1
All, 2
and, 2
And, 3
another, 1
are, 1
background, 2
be, 3
between, 1
both, 1
buffer, 1
buffer's, 1
by, 2
called, 3
circular, 1
collected, 1
combiner, 2
content, 1
do, 1
each, 1
enabled, 1
file, 3
full, 1
function, 6
if, 2
in, 1
into, 2
in-memory, 1
is, 4
key, 2
key-value, 1
logic, 1

 

Reduce #2 (keyk M - Z):

 

map, 2
Map, 1
MapOutputFile, 2
map(), 1
merged, 1
network, 1
one, 1
output, 1
over, 1
pairs, 1
partition, 1
partitioned, 1
reduce, 2
Reduce, 3
run, 1
shuffle, 1
Shuffle, 1
sort, 1
sorted, 1
sorting, 1
spill, 1
spans, 1
spill, 2
starts, 1
task, 2
tasks's, 1
the, 7
The, 1
then, 3
This, 1
thread, 2
to, 2
When, 1
will, 5
within, 1
user's, 4

 

This isn't the best format to give an example but hopefully it helps a bit.  I chose quite a large example but I thought it would help to see multiple reducers in place.  I obvious truncated the work for the latter mappers and reduced.  I also don't know how the actual code handles puncutation or capitalization, so I just winged it.

Announcements