Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

expected output not coming after the execution of a mapreduce program

Highlighted

expected output not coming after the execution of a mapreduce program

New Contributor
/*  a sample mapreduce program */
import org.apache.hadoop.fs.Path;
//import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
//import org.apache.hadoop.mapreduce.lib.output.KeyValueTextOutputFormat;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
//import org.apache.hadoop.mapred.FileInputFormat;
//import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
import org.apache.hadoop.mapreduce.Job;
import java.util.Scanner;
import java.util.StringTokenizer;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
// public class LabelPredict extends Configured implements Tool{
 class MapperAggregate extends Mapper<LongWritable, Text, Text, Text>{
 // private Text word=new Text();
  String bag1="",bag2="",bag3="",lbl="",bg1="",bg2="",bg3="";
  Double sum,avg,max,min,count,tempint;
  Text word1=new Text();
  Text word2=new Text();
  public void map(LongWritable key, Text value, OutputCollector<Text,Text> output,Reporter reporter)throws IOException{
   String outputText=processRecord(value.toString());
   StringTokenizer itr=new StringTokenizer(outputText,"\t");
   word1.set(itr.nextToken());
   bg1=itr.nextToken().toString();
   bg2=itr.nextToken().toString();
   bg3=itr.nextToken().toString();
   word2.set(bg1+"\t"+bg2+"\t"+bg3);
   output.collect(word1,word2);
  }
  public String processRecord(String record)
  {
    StringBuilder outputRecord=new StringBuilder("");
   // try{
         Scanner scanner=new Scanner(record);
        // int sum=0,avg,max=0,min=0,count=0;
        // int tempint;
                scanner.useDelimiter("\t");
         if(scanner.hasNext())
         {
          bag1=scanner.next();
          bag2=scanner.next();
          bag3=scanner.next();
          lbl=scanner.next();
         }
         StringTokenizer itr1=new StringTokenizer(bag1,",");
         StringTokenizer itr2=new StringTokenizer(bag2,",");
         StringTokenizer itr3=new StringTokenizer(bag3,",");
         sum=0.0;
         count=0.0;
         max=0.0;
         min=0.0;
         tempint=0.0;
         while(itr1.hasMoreTokens()){
          sum+=Double.parseDouble(itr1.nextToken());
          count++;
         }
         avg=sum/count;
         max=Double.parseDouble(itr2.nextToken());
         while(itr2.hasMoreTokens()){
           tempint=Double.parseDouble(itr2.nextToken());
           if(max<tempint)
           max=tempint;
         }
         min=Double.parseDouble(itr3.nextToken());
         while(itr3.hasMoreTokens()){
           tempint=Double.parseDouble(itr3.nextToken());
           if(min>tempint)
           min=tempint;
         }
         
         outputRecord.append(lbl);
         outputRecord.append("\t");
         outputRecord.append(avg.toString());
         outputRecord.append("\t");
         outputRecord.append(max.toString());
         outputRecord.append("\t");
         outputRecord.append(min.toString());
     //   }catch(Exception e){
       //    e.printStackTrace();
        // }
         return outputRecord.toString();
       }
   }
    public class LabelPredict extends Configured implements Tool{
      public int run(String[] args)throws Exception{
       Configuration conf=new Configuration();
       Job job=new Job(conf,"labelprediction");
       job.setJarByClass(LabelPredict.class);
       //JobConf conf=new JobConf(getConf(),LabelPredict.class);
      // conf.setJobName("labelprediction");
       //FileInputFormat.addInputPath(job,new Path(args[0]));
       //FileOutputFormat.setOutputPath(job,new Path(args[1]));
       job.setMapperClass(MapperAggregate.class);
       //job.setOutputKeyClass(Text.class);
       //job.setOutputValueClass(Text.class);
       job.setMapOutputKeyClass(Text.class);
       job.setMapOutputValueClass(Text.class);
       job.setInputFormatClass(KeyValueTextInputFormat.class);
      // job.setOutputFormatClass(KeyValueTextOutputFormat.class);

      // job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
       FileInputFormat.addInputPath(job,new Path(args[0]));
       FileOutputFormat.setOutputPath(job,new Path(args[1]));
       System.exit(job.waitForCompletion(true)?0:1);
       //JobClient.runJob(conf);
       return 0;
       
   }
   public static void main(String[] args)throws Exception{
    int res=ToolRunner.run(new Configuration(),new LabelPredict(),args);
    System.exit(res);
   } 
 }
        
/* INPUT FILE (dilinput.txt):
 1,2	23,17,15	11,9	good
12	11,8	12,7,8	ill
14,12,9	8,6,4	24,18	ill
8,6	11,15	12,14,17	good
14,17,12	10,6	11,7,8	ill
22,16,19	12,8	10,6	ill
17,11	23,19,21	14,9,10	good
24,21	11,15	10,9,8	ill
1,2	23,17,15	11,9	good
12	11,8	12,7,8	ill
14,12,9	8,6,4	24,18	ill
8,6	11,15	12,14,17	good
14,17,12	10,6	11,7,8	ill
22,16,19	12,8	10,6	ill
17,11	23,19,21	14,9,10	good
24,21	11,15	10,9,8	ill
1,2	23,17,15	11,9	good
12	11,8	12,7,8	ill
14,12,9	8,6,4	24,18	ill
8,6	11,15	12,14,17	good
14,17,12	10,6	11,7,8	ill
22,16,19	12,8	10,6	ill
17,11	23,19,21	14,9,10	good
24,21	11,15	10,9,8	ill
1,2	23,17,15	11,9	good
12	11,8	12,7,8	ill
14,12,9	8,6,4	24,18	ill
8,6	11,15	12,14,17	good
14,17,12	10,6	11,7,8	ill
22,16,19	12,8	10,6	ill
17,11	23,19,21	14,9,10	good
24,21	11,15	10,9,8	ill
1,2	23,17,15	11,9	good
12	11,8	12,7,8	ill
14,12,9	8,6,4	24,18	ill
8,6	11,15	12,14,17	good
14,17,12	10,6	11,7,8	ill
22,16,19	12,8	10,6	ill
17,11	23,19,21	14,9,10	good
24,21	11,15	10,9,8	ill
1,2	23,17,15	11,9	good
12	11,8	12,7,8	ill
14,12,9	8,6,4	24,18	ill
8,6	11,15	12,14,17	good
14,17,12	10,6	11,7,8	ill
22,16,19	12,8	10,6	ill
17,11	23,19,21	14,9,10	good
24,21	11,15	10,9,8	ill

EXECUTION:

[root@quickstart cloudera]# javac -classpath /usr/lib/hadoop/hadoop-annotations-2.6.0-cdh5.5.0.jar:/usr/lib/hadoop/hadoop-common-2.6.0-cdh5.5.0.jar:/usr/lib/hadoop-0.20-mapreduce/hadoop-core-2.6.0-mr1-cdh5.5.0.jar LabelPredict.java
[root@quickstart cloudera]# jar -cvf /home/cloudera/labeling.jar -C /home/cloudera/labeling/ .added manifest
adding: MapperAggregate.class(in = 3089) (out= 1470)(deflated 52%)
adding: LabelPredict.class(in = 1768) (out= 862)(deflated 51%)
[root@quickstart cloudera]# hadoop jar /home/cloudera/labeling.jar LabelPredict /dilinput.txt /diloutput9.txt15/12/22 03:21:22 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
15/12/22 03:21:23 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
15/12/22 03:21:23 INFO input.FileInputFormat: Total input paths to process : 1
15/12/22 03:21:23 INFO mapreduce.JobSubmitter: number of splits:1
15/12/22 03:21:24 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1450780152769_0002
15/12/22 03:21:24 INFO impl.YarnClientImpl: Submitted application application_1450780152769_0002
15/12/22 03:21:25 INFO mapreduce.Job: The url to track the job: http://quickstart.cloudera:8088/proxy/application_1450780152769_0002/
15/12/22 03:21:25 INFO mapreduce.Job: Running job: job_1450780152769_0002
15/12/22 03:21:39 INFO mapreduce.Job: Job job_1450780152769_0002 running in uber mode : false
15/12/22 03:21:39 INFO mapreduce.Job:  map 0% reduce 0%
15/12/22 03:21:50 INFO mapreduce.Job:  map 100% reduce 0%
15/12/22 03:22:02 INFO mapreduce.Job:  map 100% reduce 100%
15/12/22 03:22:02 INFO mapreduce.Job: Job job_1450780152769_0002 completed successfully
15/12/22 03:22:03 INFO mapreduce.Job: Counters: 49
	File System Counters
		FILE: Number of bytes read=1248
		FILE: Number of bytes written=225659
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
		HDFS: Number of bytes read=1246
		HDFS: Number of bytes written=1140
		HDFS: Number of read operations=6
		HDFS: Number of large read operations=0
		HDFS: Number of write operations=2
	Job Counters 
		Launched map tasks=1
		Launched reduce tasks=1
		Data-local map tasks=1
		Total time spent by all maps in occupied slots (ms)=8996
		Total time spent by all reduces in occupied slots (ms)=9480
		Total time spent by all map tasks (ms)=8996
		Total time spent by all reduce tasks (ms)=9480
		Total vcore-seconds taken by all map tasks=8996
		Total vcore-seconds taken by all reduce tasks=9480
		Total megabyte-seconds taken by all map tasks=9211904
		Total megabyte-seconds taken by all reduce tasks=9707520
	Map-Reduce Framework
		Map input records=51
		Map output records=51
		Map output bytes=1140
		Map output materialized bytes=1248
		Input split bytes=109
		Combine input records=0
		Combine output records=0
		Reduce input groups=9
		Reduce shuffle bytes=1248
		Reduce input records=51
		Reduce output records=51
		Spilled Records=102
		Shuffled Maps =1
		Failed Shuffles=0
		Merged Map outputs=1
		GC time elapsed (ms)=201
		CPU time spent (ms)=1950
		Physical memory (bytes) snapshot=353787904
		Virtual memory (bytes) snapshot=3008630784
		Total committed heap usage (bytes)=226365440
	Shuffle Errors
		BAD_ID=0
		CONNECTION=0
		IO_ERROR=0
		WRONG_LENGTH=0
		WRONG_MAP=0
		WRONG_REDUCE=0
	File Input Format Counters 
		Bytes Read=1137
	File Output Format Counters 
		Bytes Written=1140

OUTPUT I GOT:

[root@quickstart cloudera]# hadoop fs -cat /diloutput9.txt/part-r-00000	
	
	
1,2	23,17,15	11,9	good
1,2	23,17,15	11,9	good
1,2	23,17,15	11,9	good
1,2	23,17,15	11,9	good
1,2	23,17,15	11,9	good
1,2	23,17,15	11,9	good
12	11,8	12,7,8	ill
12	11,8	12,7,8	ill
12	11,8	12,7,8	ill
12	11,8	12,7,8	ill
12	11,8	12,7,8	ill
12	11,8	12,7,8	ill
14,12,9	8,6,4	24,18	ill
14,12,9	8,6,4	24,18	ill
14,12,9	8,6,4	24,18	ill
14,12,9	8,6,4	24,18	ill
14,12,9	8,6,4	24,18	ill
14,12,9	8,6,4	24,18	ill
14,17,12	10,6	11,7,8	ill
14,17,12	10,6	11,7,8	ill
14,17,12	10,6	11,7,8	ill
14,17,12	10,6	11,7,8	ill
14,17,12	10,6	11,7,8	ill
14,17,12	10,6	11,7,8	ill
17,11	23,19,21	14,9,10	good
17,11	23,19,21	14,9,10	good
17,11	23,19,21	14,9,10	good
17,11	23,19,21	14,9,10	good
17,11	23,19,21	14,9,10	good
17,11	23,19,21	14,9,10	good
22,16,19	12,8	10,6	ill
22,16,19	12,8	10,6	ill
22,16,19	12,8	10,6	ill
22,16,19	12,8	10,6	ill
22,16,19	12,8	10,6	ill
22,16,19	12,8	10,6	ill
24,21	11,15	10,9,8	ill
24,21	11,15	10,9,8	ill
24,21	11,15	10,9,8	ill
24,21	11,15	10,9,8	ill
24,21	11,15	10,9,8	ill
24,21	11,15	10,9,8	ill
8,6	11,15	12,14,17	good
8,6	11,15	12,14,17	good
8,6	11,15	12,14,17	good
8,6	11,15	12,14,17	good
8,6	11,15	12,14,17	good
8,6	11,15	12,14,17	good
[root@quickstart cloudera]# 

ACCORDING TO LOGIC IN THE PROGRAM, IT HAS TO GIVE OUTPUT AS

good	7.000000	15.000000	12.000000
(label)     (avg of 8,6)  (max of 11,15)  (min of 12,14,17)
(Here first one is label , second one is average, third one is maximum, fourth one is minimum)


for a line below:

8,6	11,15	12,14,17	good


IT SHOULD GIVE OUTPUT SAME AS ABOVE FOR ALL THE LINES.


SO, PL. RECTIFY THE PROBLEM AND SUGGEST SOMETHING.

*/