Support Questions

Find answers, ask questions, and share your expertise

Read sequence file in MapReduce

avatar
Contributor

Hi everybody.

Today I've ran into a strange situation which is even hard to explain.

I use MapReduce to read a sequence file, where every row represents a JSON entry. It was a big surprise for me that some rows, which are SHORTER than previous ones, contain chunks of data from previous rows.

For example:

{"id":"121B5A8FE08B1F13E050007F010016F6","data":"foo1=603; foo2=31; foo14=foo15; foo9=0; foo10=foo39; foo3 foo28=foo29; foo30 foo28=foo31; foo3 foo26=foo29; foo27=foo32; foo25=foo32; foo19=180,000; foo44=foo24 ","docId":"EF989D8481C4EE9CE040600AB8000D36","foo21":"ins603bh","ts":1389341504951,"foo13":"603","docType":"foo17","operationType":"Modify"}



{"id":"121B5A8FE08C1F13E050007F010016F6","data":"foo1=613;foo3=foo47;foo40=foo35;foo41=4 foo45 foo46;foo36;foo37=0;foo38=foo20;foo33=foo20;foo34;foo12=foo42,foo19 foo43=715554;","docId":"EF9A4646E84E3C73E040600AB8003289","foo21":"64_613","ts":1389341548640,"foo13":"613","docType":"foo18","operationType":"Create"}51,"foo13":"603","docType":"foo17","operationType":"Modify"}

{"id":"121B5A8FE08D1F13E050007F010016F6","data":"foo1=619; foo3=foo5; foo6=33; foo7=foo8; foo9=1001; foo10=foo11; foo12=foo20; foo19=142,211,020","docId":"EF9A2796D8BC2F01E040600AB8002F81","foo21":"foo22","ts":1389341549845,"foo13":"619","docType":"foo23","operationType":"Create"}6E84E3C73E040600AB8003289","foo21":"64_613","ts":1389341548640,"foo13":"613","docType":"foo18","operationType":"Create"}51,"foo13":"603","docType":"foo17","operationType":"Modify"}

{"id":"121B5A8FE08E1F13E050007F010016F6","data":"foo1=619; foo3=foo5; foo6=33; foo7=foo8; foo9=0901; foo10=foo11; foo12=foo20; foo19=32,937","docId":"EF9A2796D8C02F01E040600AB8002F81","foo21":"foo22","ts":1389341549866,"foo13":"619","docType":"foo23","operationType":"Create"}ate"}6E84E3C73E040600AB8003289","foo21":"64_613","ts":1389341548640,"foo13":"613","docType":"foo18","operationType":"Create"}51,"foo13":"603","docType":"foo17","operationType":"Modify"}

{"id":"121B5A8FE08F1F13E050007F010016F6","data":"foo1=619; foo3=foo5; foo6=33; foo7=foo8; foo9=0202; foo10=foo39; foo12=foo20; foo19=80,000,000","docId":"EF9A2796D8C72F01E040600AB8002F81","foo21":"foo22","ts":1389341549895,"foo13":"619","docType":"foo23","operationType":"Create"}e":"Create"}ate"}6E84E3C73E040600AB8003289","foo21":"64_613","ts":1389341548640,"foo13":"613","docType":"foo18","operationType":"Create"}51,"foo13":"603","docType":"foo17","operationType":"Modify"}

{"id":"121B5A8FE0901F13E050007F010016F6","data":"foo1=619; foo3=foo5; foo6=M0; foo7=foo8; foo9=1001; foo10=foo11; foo12=foo20; foo19=142,211,020","docId":"EF9A2796D8CB2F01E040600AB8002F81","foo21":"foo22","ts":1389341549929,"foo13":"619","docType":"foo23","operationType":"Create"}6E84E3C73E040600AB8003289","foo21":"64_613","ts":1389341548640,"foo13":"613","docType":"foo18","operationType":"Create"}51,"foo13":"603","docType":"foo17","operationType":"Modify"}

As you can see, starting from the second JSON item, we got incorrect JSON with appended text after the closing bracket '}':

"51,"foo13":"603","docType":"foo17","operationType":"Modify"}"

(which is actually is a chunk of the tail of the first record).

It looks like there is some kind of byte buffer somewhere in mapreduce, which is used to read sequence file data, and it is not emptied after each line. And in case when the following line is shorter than the previous one, we get some chunks on old data.

Please, can anyone help me with this issue?

1 ACCEPTED SOLUTION

avatar
@Dennis Fridlyand, thank you for sharing the mapper and reducer code. I think I've spotted a bug in the reducer that I can help with.
  protected void map(LongWritable key, BytesWritable value,
      Mapper<LongWritable, BytesWritable, Text, Text>.Context context) throws IOException,
      InterruptedException {
    final String json = new String(value.getBytes(), "UTF-8");

Here, value is an instance of BytesWritable. A BytesWritable is a wrapper over an underlying byte array. That underlying buffer may be reused multiple times to represent different records. The actual length of the data within the buffer that is considered valid is tracked separately using a numeric size field. For more details, please see the source code for BytesWritable.

By calling value.getBytes(), the mapper code is accessing the raw underlying buffer. This buffer might still contain trailing data from a previous record. Only the data in the buffer up to the length returned by value.getLength() is truly valid.

The recommendation is to switch to using the copyBytes() method, which contains additional logic to copy only the currently valid bytes of the underlying buffer. I recommend making the following change in the mapper code.

    final String json = new String(value.copyBytes(), "UTF-8");

Would you please try that?

View solution in original post

6 REPLIES 6

avatar
Contributor

My job is rather simple. It just reads the input and emits everything to output:

JobConf jobConf = new JobConf(getConf(), ArchiveMergeJob.class);
jobConf.setJobName(JOB_NAME);

Job job = Job.getInstance(jobConf);
job.setJarByClass(ArchiveMergeRunner.class);

SequenceFileInputFormat.addInputPath(job, new Path(args[0]));
job.setInputFormatClass(SequenceFileInputFormat.class);

job.setMapperClass(ArchiveMergeMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(ArchiveMergeReducer.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);

TextOutputFormat.setOutputPath(job, new Path(args[1]));

return job.waitForCompletion(true) ? 0 : 1;

avatar

@Dennis Fridlyand, would you please also share the code for the ArchiveMergeMapper and ArchiveMergeReducer classes?

avatar
Contributor

Hi, @Chris Nauroth

Here's implementation of mapper and reducer. As I've said, everything is trivial.

public class ArchiveMergeMapper extends Mapper<LongWritable, BytesWritable, Text, Text> {


  private final Logger log = Logger.getLogger(ArchiveMergeMapper.class);
  private Text outKey = new Text();


  @Override
  protected void map(LongWritable key, BytesWritable value,
      Mapper<LongWritable, BytesWritable, Text, Text>.Context context) throws IOException,
      InterruptedException {


    final String json = new String(value.getBytes(), "UTF-8");


    IMyInterface myObj = MyUtil.parseJson(json);
    if (myObj.getId() != null) {
      outKey.set(myObj.getId());
      context.write(outKey, new Text(json));
    } else {
      log.warn("Incorrect string" + json);
    }
  }
}
public class ArchiveMergeReducer extends Reducer<Text, Text, LongWritable, Text> {


  private LongWritable keyLW = new LongWritable(1);


  @Override
  protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, LongWritable, Text>.Context context)
      throws IOException, InterruptedException {


    if (values.iterator().hasNext()) {
      context.write(keyLW, values.iterator().next());
    }
  }
}

avatar
@Dennis Fridlyand, thank you for sharing the mapper and reducer code. I think I've spotted a bug in the reducer that I can help with.
  protected void map(LongWritable key, BytesWritable value,
      Mapper<LongWritable, BytesWritable, Text, Text>.Context context) throws IOException,
      InterruptedException {
    final String json = new String(value.getBytes(), "UTF-8");

Here, value is an instance of BytesWritable. A BytesWritable is a wrapper over an underlying byte array. That underlying buffer may be reused multiple times to represent different records. The actual length of the data within the buffer that is considered valid is tracked separately using a numeric size field. For more details, please see the source code for BytesWritable.

By calling value.getBytes(), the mapper code is accessing the raw underlying buffer. This buffer might still contain trailing data from a previous record. Only the data in the buffer up to the length returned by value.getLength() is truly valid.

The recommendation is to switch to using the copyBytes() method, which contains additional logic to copy only the currently valid bytes of the underlying buffer. I recommend making the following change in the mapper code.

    final String json = new String(value.copyBytes(), "UTF-8");

Would you please try that?

avatar
Contributor

@Chris Nauroth, thank you very much! You saved me a lot of time, and finally your solution have to solve my problem.

I had a feeling that the issue was with buffers, but I didn't guess that value may be reused in mapper...

avatar

@Dennis Fridlyand, I'm glad to hear this helped!