Created 11-07-2017 11:08 AM
Dear Community,
I have a Mapreduce job which processes 1.8TB data set. My map task generates around 2.5 TB of intermediate data and the number of distinct keys would easily cross a billion . I have set a split size to be 128MB. So, total number of splits generated is approximately 14,000/-. I have set a number of reducers to be 166. My cluster size is 8 nodes. 7 nodes are data nodes out of 8 nodes. 1 is a name node. Each data node has got 24 logical cores and 128GB RAM. When the job is running with this configuration, map completes its execution but my reduce phase stucks at 26%. May i know that what should be the split size and number of reducers i should have for this particular problem with my current cluster size. Please provide suggestions. Thanks.
Created 11-07-2017 11:33 AM
Lets start with basics and try to answer your questions.
1. Split size = Hdfs block size by default . changing the split size will have a impact on the number of mappers and not reducers.
128 MB split size is good to start with.
2. Rule of thumb : A reducer should process 1 GB of data ideally going by this logic you should have : 2.5TB / 1 GB = 2500 Reducers ,
3. you have 20 * 7 = 140 containers(available in one go ) to run reducer , running 2500 reducers will take 2500 / 140 = 17 rounds which is a lot . Hence I will fix My reducer to some where aound 800 to 900.
3. Your mappers is producing more data as intermediate , what are you doing in this step , can you use combiner(is it possible) to make this intermediate data small ? can you move some filter operation at map stage .
4. If your reducer are getting stuck at 26% there can be several reason
1. You have a skewed key which results in one reducer getting stuck
2. 26% means its is stuck at shuffle phase itself , which is stating one reducer is getting a lot of data(another indication of skewed joins)
3. Have you enabled compression for map output ?
Created 11-07-2017 12:42 PM
kgautam, Thanks for your reply.
1) Currently, I'm not using any combiner. My map phase output <key,value> pair is <string/text,string/text>. As my value is string/text in map phase output <key,value> pair, I think that It will be difficult to write the combiner. Usually,the function of the combiner is same as the reducer. Here, I'm not able to think of writing the combiner for this particular problem.
2) Currently,we tried with this compression for map output "-D mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.Lz4Codec". Is this configuration enough to compress map output? Do we have to modify or write some statements in our mapreduce code to use this compression?
3) May i know that where do you get this rule of thumb "A reducer should process 1 GB of data" ?
4) When i have 24 logical cores in one data node, Why you have mentioned 20 * 7? I think that it should be 24*7?
5) How to handle skewed key? Can i handle it using partitioner? Do we have any other way?
Thanks.
Created 11-07-2017 01:10 PM
1) Currently, I'm not using any combiner. My map phase output
<key,value> pair is <string/text,string/text>. As my value
is string/text in map phase output <key,value> pair, I think that
It will be difficult to write the combiner. Usually,the function of the
combiner is same as the reducer. Here, I'm not able to think of writing
the combiner for this particular problem.
Is it possible to write key , List<String> , you can byte-serialize the value , or use thrift definition to have the values together in one structure.
You are saving on not emitting the same key again . Use this if your specific use case permits.
Mapper generating more data as comped to input means , it is emitting more records as received , which means key duplication is happening (Generally speaking)
2) Currently,we tried with this compression for map output "-D mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.Lz4Codec". Is this configuration enough to compress map output? Do we have to modify or write some statements in our mapreduce code to use this compression?
conf.set("mapreduce.compress.map.output", "true") conf.set("mapreduce.output.compression.type", "BLOCK"); conf.set("mapreduce.map.output.compression.codec", "Use one enabled in your cluster LZ4/SNAPPY/GzipCodec");
3) May i know that where do you get this rule of thumb "A reducer should process 1 GB of data" ?
This is mostly used in all framework like Pig , as well as generally Reducer heap is in order of 1.5 GB . This is a default number. Please tune according to your needs .
Time taken to process X amount of data = Time to spawn the process(Scheduling time ) + time to do IO from file + time to process the logic. 1 GB is the minimum size below which job spawning time is comparable to the time taken for processing.
4) When i have 24 logical cores in one data node, Why you have mentioned 20 * 7? I think that it should be 24*7?
I left 4 *7 containers for other services and other jobs running in your cluster. (Always good to underestimate while doing calculations for performance )
5) How to handle skewed key? Can i handle it using partitioner? Do we have any other way?
There are Many ways , best is to have a look at your skewed key and come to logical conclusion
1. Look at Pig Skewed Join implementation (add salt to your key and then reduce twice , divide and conquer )
2. Take top N events for a given key (If logic permits).
Created 11-08-2017 09:22 AM
Thanks for your suggestions and will try to incorporate your suggestions and come back to you with more questions!