Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Cannot setup userlogs in Snabox cluster

Highlighted

Cannot setup userlogs in Snabox cluster

Explorer

Greetings Cloudera Users!

 

Please help me to solve a simple problem. I have two setups of Hadoop : Sandbox from Cloudera and my own installation of 3-node cluster. In both cases I run the same mapreduce and in both cases I face two issues:

 

1) I cannot see userlogs that my mapper prints.

2) for some reasons hadoop runs reducer in the scope of my job, while in fact I dont have a reducer in my code!

 

Could you please help me with this?

 

I have noticed that the default log4j.properties file do not has some appender added to root logger, except for console appender. OK, I have added RFA appender to root logger. But this helped me not. 

 

Here is a code of my program:

package hadoop.basics;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Logger;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/**
* Created by Root on 1/21/2015.
*/
public class SimpleLineMapper extends Configured implements Tool {

static Logger logger = Logger.getLogger(SimpleLineMapper.class);


public static class MaxSplitCountMapper extends Mapper<LongWritable, Text, IntWritable, LongWritable> {
@Override
public void run(Context context) throws IOException, InterruptedException {

FileSplit split = (FileSplit) context.getInputSplit();
long splitIndex = split.getStart();

int rowCount = 0;
setup(context);
IntWritable value = null;
while (context.nextKeyValue()) {
rowCount++;
}

logger.info("counted " + rowCount + " for " + splitIndex);

context.write(new IntWritable(rowCount), new LongWritable(splitIndex));
cleanup(context);
}
}

public static class LineNumberMapper extends Mapper<LongWritable, Text, IntWritable, Text> {

Map<Long, Integer> splitToRowCountMap = new HashMap<Long, Integer>();
int rowCount = 0;

@Override
public void setup(Context context) {
SequenceFile.Reader sequenceFileReader = null;
try {
SequenceFile.Reader.Option filePath = SequenceFile.Reader.file(new Path("\\output_tmp\\part-r-00000"));
sequenceFileReader = new SequenceFile.Reader(context.getConfiguration(), filePath);

IntWritable key = (IntWritable) ReflectionUtils.newInstance(
sequenceFileReader.getKeyClass(), context.getConfiguration());
LongWritable value = (LongWritable) ReflectionUtils.newInstance(
sequenceFileReader.getValueClass(), context.getConfiguration());

int currentSplitRowCount = 0;
int prevSplitLineOffset = 0;
int prevSplitRowCount = 0;
while (sequenceFileReader.next(key, value)) {

currentSplitRowCount = prevSplitLineOffset + prevSplitRowCount;
splitToRowCountMap.put(value.get(), currentSplitRowCount);
prevSplitRowCount = key.get();
prevSplitLineOffset = currentSplitRowCount;

logger.info(value.get() + " " + currentSplitRowCount);
System.out.println(value.get() + " " + currentSplitRowCount);
}

} catch (Exception e) {
e.printStackTrace();

} finally {
IOUtils.closeStream(sequenceFileReader);
}

}

@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {


FileSplit split = (FileSplit) context.getInputSplit();
long splitIndex = split.getStart();

logger.info("indexes for " + splitIndex + " is " + splitToRowCountMap.get(splitIndex) + " size is " + splitToRowCountMap.size());

int lineOffset = splitToRowCountMap.get(splitIndex);

rowCount++;

context.write(new IntWritable(lineOffset + rowCount), value);

}
}

@Override
public int run(String[] args) throws Exception {
int cj = runRowCountJob(args);
int lj = runLineNumberJob(args);
return cj*lj > 0 ? 1 : -1;
}

public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new SimpleLineMapper(), args);
logger.info(new Path(".").toString());
System.exit(exitCode);
}

int runLineNumberJob(String[] args) throws Exception {
Configuration conf = getConf();
conf.set("mapreduce.jobtracker.staging.root.dir", args[2]);
conf.set("mapred.local.dir", args[3]);
conf.set("fs.local.block.size", 1000000 + "");
Job job = new Job(conf, "Distributed Row Number Counter");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1] + "_final"));
job.setMapperClass(LineNumberMapper.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);
job.setJarByClass(SimpleLineMapper.class);
return job.waitForCompletion(true) ? 0 : 1;
}

int runRowCountJob(String[] args) throws Exception {
Configuration conf = getConf();
conf.set("mapreduce.jobtracker.staging.root.dir",args[2]);
conf.set("mapred.local.dir", args[3]);
conf.set("fs.local.block.size", 1000000 + "");
Job job = new Job(conf, "Max split row number Counter");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1] + "_tmp"));
job.setMapperClass(MaxSplitCountMapper.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(LongWritable.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setJarByClass(SimpleLineMapper.class);
return job.waitForCompletion(true) ? 0 : 1;
}
}