Created 02-27-2016 10:43 AM
Hi,
I am executing the below MapReduce program for KeyValueTextInputFormat and get 0 records from reducer. Could you help to identify the reasons.
urlcount.txt urlcountm.txt urlcountr.txt Data: http://url12.com 36 http://url11.com 4 http://url20.com 36 http://url1.com 256 http://url1.com 267
Thanks in advance!!!
Created on 02-28-2016 02:13 AM - edited 08-18-2019 04:54 AM
excellent question, it's been a while since I'd touched MR and learned something new (KeyValueTextInputFormat). So firstly, assuming your data looks like this
http://url12.com 36 http://url11.com 4 http://url20.com 36 http://url1.com 256 http://url1.com 267
KeyValueInputFormat class states the following
An InputFormat
for plain text files. Files are broken into lines. Either linefeed or carriage-return are used to signal end of line. Each line is divided into key and value parts by a separator byte. If no such a byte exists, the key will be the entire line and value will be empty.
You did not specify a separator in your job configuration. I made a few changes to your code
package com.hortonworks.mapreduce; /** * * @author aervits */ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class URLCount extends Configured implements Tool { public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new URLCount(), args); System.exit(res); } @Override public int run(String[] args) throws Exception { Configuration conf = this.getConf(); conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", " "); Job job = Job.getInstance(conf, "URLCount"); job.setJarByClass(getClass()); job.setInputFormatClass(KeyValueTextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setMapperClass(URLCountM.class); job.setReducerClass(URLCountR.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); return (job.waitForCompletion(true) == true ? 0 : -1); } }
notice in your tool runner, you don't terminate the job with expected result, then in your run method, I added setOutputKeyClass, setOutputValueClass, setMapOutputKeyClass and setMapOutputValueClass, I also set separator config, changed LongWritable to IntWritable. What does that mean? Well, not setting the separator means KeyValueTextInputFormat will treat the whole line as string and you don't receive any Value from Mapper function. So you'd think you're not getting results from reducer but in reality you weren't passing anything from mapper to reducer in the first place. Moving on,
package com.hortonworks.mapreduce; import java.io.IOException; import java.util.logging.Level; import java.util.logging.Logger; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; /** * * @author aervits */ public class URLCountM extends Mapper<Text, Text,Text, IntWritable> { private static final Logger LOG = Logger.getLogger(URLCountM.class.getName()); public final IntWritable iw = new IntWritable(); @Override public void map(Text key, Text value, Context context){ try{ LOG.log(Level.INFO, "MAP_KEY: ".concat(key.toString()).concat(" MAP_VALUE: ".concat(value.toString()))); context.write(key, new IntWritable(Integer.valueOf(value.toString()))); } catch(NumberFormatException | IOException | InterruptedException e){ LOG.log(Level.SEVERE, "ERROR: ".concat(e.toString())); } } }
notice I added logger, this is a better way of printing out to log expected keys and values
package com.hortonworks.mapreduce; import java.io.IOException; import java.util.logging.Level; import java.util.logging.Logger; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; /** * * @author aervits */ public class URLCountR extends Reducer<Text, IntWritable, Text, IntWritable> { private static final Logger LOG = Logger.getLogger(URLCountR.class.getName()); private IntWritable result = new IntWritable(); @Override public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); LOG.log(Level.INFO, "REDUCER_VALUE: ".concat(result.toString())); context.write(key, result); } }
notice I'm printing to log again in reducer, just to keep sanity, below is what it looks like in the logs.
finally the result looks like so
http://url1.com 523 http://url11.com 4 http://url12.com 36 http://url20.com 36
finally, I published the code to my repo, grab it if you need a working example, I compiled it with HDP specific versions of Hadoop. Using our repositories vs Apache is recommended. Take a look at my pom.xml.
https://github.com/dbist/URLCount
One final hint would be to look at the statistics being printed out after job completes. I realized you were not sending data to Reducer when I ran your code and saw 0 records from mapper. This is what it should look like having only 5 lines of input data.
Map-Reduce Framework Map input records=5 Map output records=5 Map output bytes=103
Created 02-27-2016 10:19 PM
These questions are always almost impossible to answer.
I would add a couple System.outs to your mapper and reducer to see if data goes in or out. You can then see these messages in the Resourcemanager UI ( port 8088 ) -> Click on your task, click through attempt->Mapper and reducer -> then logs.
Created 02-27-2016 10:27 PM
Maybe I would also suggest that you print out line after that command to see if it is generating the expected output!.
Created on 02-28-2016 02:13 AM - edited 08-18-2019 04:54 AM
excellent question, it's been a while since I'd touched MR and learned something new (KeyValueTextInputFormat). So firstly, assuming your data looks like this
http://url12.com 36 http://url11.com 4 http://url20.com 36 http://url1.com 256 http://url1.com 267
KeyValueInputFormat class states the following
An InputFormat
for plain text files. Files are broken into lines. Either linefeed or carriage-return are used to signal end of line. Each line is divided into key and value parts by a separator byte. If no such a byte exists, the key will be the entire line and value will be empty.
You did not specify a separator in your job configuration. I made a few changes to your code
package com.hortonworks.mapreduce; /** * * @author aervits */ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class URLCount extends Configured implements Tool { public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new URLCount(), args); System.exit(res); } @Override public int run(String[] args) throws Exception { Configuration conf = this.getConf(); conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", " "); Job job = Job.getInstance(conf, "URLCount"); job.setJarByClass(getClass()); job.setInputFormatClass(KeyValueTextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setMapperClass(URLCountM.class); job.setReducerClass(URLCountR.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); return (job.waitForCompletion(true) == true ? 0 : -1); } }
notice in your tool runner, you don't terminate the job with expected result, then in your run method, I added setOutputKeyClass, setOutputValueClass, setMapOutputKeyClass and setMapOutputValueClass, I also set separator config, changed LongWritable to IntWritable. What does that mean? Well, not setting the separator means KeyValueTextInputFormat will treat the whole line as string and you don't receive any Value from Mapper function. So you'd think you're not getting results from reducer but in reality you weren't passing anything from mapper to reducer in the first place. Moving on,
package com.hortonworks.mapreduce; import java.io.IOException; import java.util.logging.Level; import java.util.logging.Logger; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; /** * * @author aervits */ public class URLCountM extends Mapper<Text, Text,Text, IntWritable> { private static final Logger LOG = Logger.getLogger(URLCountM.class.getName()); public final IntWritable iw = new IntWritable(); @Override public void map(Text key, Text value, Context context){ try{ LOG.log(Level.INFO, "MAP_KEY: ".concat(key.toString()).concat(" MAP_VALUE: ".concat(value.toString()))); context.write(key, new IntWritable(Integer.valueOf(value.toString()))); } catch(NumberFormatException | IOException | InterruptedException e){ LOG.log(Level.SEVERE, "ERROR: ".concat(e.toString())); } } }
notice I added logger, this is a better way of printing out to log expected keys and values
package com.hortonworks.mapreduce; import java.io.IOException; import java.util.logging.Level; import java.util.logging.Logger; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; /** * * @author aervits */ public class URLCountR extends Reducer<Text, IntWritable, Text, IntWritable> { private static final Logger LOG = Logger.getLogger(URLCountR.class.getName()); private IntWritable result = new IntWritable(); @Override public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); LOG.log(Level.INFO, "REDUCER_VALUE: ".concat(result.toString())); context.write(key, result); } }
notice I'm printing to log again in reducer, just to keep sanity, below is what it looks like in the logs.
finally the result looks like so
http://url1.com 523 http://url11.com 4 http://url12.com 36 http://url20.com 36
finally, I published the code to my repo, grab it if you need a working example, I compiled it with HDP specific versions of Hadoop. Using our repositories vs Apache is recommended. Take a look at my pom.xml.
https://github.com/dbist/URLCount
One final hint would be to look at the statistics being printed out after job completes. I realized you were not sending data to Reducer when I ran your code and saw 0 records from mapper. This is what it should look like having only 5 lines of input data.
Map-Reduce Framework Map input records=5 Map output records=5 Map output bytes=103
Created 02-28-2016 05:23 AM
Thank you!!!
I am happy to join this network for the level of support being provided. It keeps me motivated!!!