Reply
Explorer
Posts: 9
Registered: ‎11-04-2014

Cannot setup userlogs in Snabox cluster

Greetings Cloudera Users!

 

Please help me to solve a simple problem. I have two setups of Hadoop : Sandbox from Cloudera and my own installation of 3-node cluster. In both cases I run the same mapreduce and in both cases I face two issues:

 

1) I cannot see userlogs that my mapper prints.

2) for some reasons hadoop runs reducer in the scope of my job, while in fact I dont have a reducer in my code!

 

Could you please help me with this?

 

I have noticed that the default log4j.properties file do not has some appender added to root logger, except for console appender. OK, I have added RFA appender to root logger. But this helped me not. 

 

Here is a code of my program:

package hadoop.basics;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Logger;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/**
* Created by Root on 1/21/2015.
*/
public class SimpleLineMapper extends Configured implements Tool {

static Logger logger = Logger.getLogger(SimpleLineMapper.class);


public static class MaxSplitCountMapper extends Mapper<LongWritable, Text, IntWritable, LongWritable> {
@Override
public void run(Context context) throws IOException, InterruptedException {

FileSplit split = (FileSplit) context.getInputSplit();
long splitIndex = split.getStart();

int rowCount = 0;
setup(context);
IntWritable value = null;
while (context.nextKeyValue()) {
rowCount++;
}

logger.info("counted " + rowCount + " for " + splitIndex);

context.write(new IntWritable(rowCount), new LongWritable(splitIndex));
cleanup(context);
}
}

public static class LineNumberMapper extends Mapper<LongWritable, Text, IntWritable, Text> {

Map<Long, Integer> splitToRowCountMap = new HashMap<Long, Integer>();
int rowCount = 0;

@Override
public void setup(Context context) {
SequenceFile.Reader sequenceFileReader = null;
try {
SequenceFile.Reader.Option filePath = SequenceFile.Reader.file(new Path("\\output_tmp\\part-r-00000"));
sequenceFileReader = new SequenceFile.Reader(context.getConfiguration(), filePath);

IntWritable key = (IntWritable) ReflectionUtils.newInstance(
sequenceFileReader.getKeyClass(), context.getConfiguration());
LongWritable value = (LongWritable) ReflectionUtils.newInstance(
sequenceFileReader.getValueClass(), context.getConfiguration());

int currentSplitRowCount = 0;
int prevSplitLineOffset = 0;
int prevSplitRowCount = 0;
while (sequenceFileReader.next(key, value)) {

currentSplitRowCount = prevSplitLineOffset + prevSplitRowCount;
splitToRowCountMap.put(value.get(), currentSplitRowCount);
prevSplitRowCount = key.get();
prevSplitLineOffset = currentSplitRowCount;

logger.info(value.get() + " " + currentSplitRowCount);
System.out.println(value.get() + " " + currentSplitRowCount);
}

} catch (Exception e) {
e.printStackTrace();

} finally {
IOUtils.closeStream(sequenceFileReader);
}

}

@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {


FileSplit split = (FileSplit) context.getInputSplit();
long splitIndex = split.getStart();

logger.info("indexes for " + splitIndex + " is " + splitToRowCountMap.get(splitIndex) + " size is " + splitToRowCountMap.size());

int lineOffset = splitToRowCountMap.get(splitIndex);

rowCount++;

context.write(new IntWritable(lineOffset + rowCount), value);

}
}

@Override
public int run(String[] args) throws Exception {
int cj = runRowCountJob(args);
int lj = runLineNumberJob(args);
return cj*lj > 0 ? 1 : -1;
}

public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new SimpleLineMapper(), args);
logger.info(new Path(".").toString());
System.exit(exitCode);
}

int runLineNumberJob(String[] args) throws Exception {
Configuration conf = getConf();
conf.set("mapreduce.jobtracker.staging.root.dir", args[2]);
conf.set("mapred.local.dir", args[3]);
conf.set("fs.local.block.size", 1000000 + "");
Job job = new Job(conf, "Distributed Row Number Counter");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1] + "_final"));
job.setMapperClass(LineNumberMapper.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);
job.setJarByClass(SimpleLineMapper.class);
return job.waitForCompletion(true) ? 0 : 1;
}

int runRowCountJob(String[] args) throws Exception {
Configuration conf = getConf();
conf.set("mapreduce.jobtracker.staging.root.dir",args[2]);
conf.set("mapred.local.dir", args[3]);
conf.set("fs.local.block.size", 1000000 + "");
Job job = new Job(conf, "Max split row number Counter");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1] + "_tmp"));
job.setMapperClass(MaxSplitCountMapper.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(LongWritable.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setJarByClass(SimpleLineMapper.class);
return job.waitForCompletion(true) ? 0 : 1;
}
}