Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Read sequence file in MapReduce

avatar
Contributor

Hi everybody.

Today I've ran into a strange situation which is even hard to explain.

I use MapReduce to read a sequence file, where every row represents a JSON entry. It was a big surprise for me that some rows, which are SHORTER than previous ones, contain chunks of data from previous rows.

For example:

{"id":"121B5A8FE08B1F13E050007F010016F6","data":"foo1=603; foo2=31; foo14=foo15; foo9=0; foo10=foo39; foo3 foo28=foo29; foo30 foo28=foo31; foo3 foo26=foo29; foo27=foo32; foo25=foo32; foo19=180,000; foo44=foo24 ","docId":"EF989D8481C4EE9CE040600AB8000D36","foo21":"ins603bh","ts":1389341504951,"foo13":"603","docType":"foo17","operationType":"Modify"}



{"id":"121B5A8FE08C1F13E050007F010016F6","data":"foo1=613;foo3=foo47;foo40=foo35;foo41=4 foo45 foo46;foo36;foo37=0;foo38=foo20;foo33=foo20;foo34;foo12=foo42,foo19 foo43=715554;","docId":"EF9A4646E84E3C73E040600AB8003289","foo21":"64_613","ts":1389341548640,"foo13":"613","docType":"foo18","operationType":"Create"}51,"foo13":"603","docType":"foo17","operationType":"Modify"}

{"id":"121B5A8FE08D1F13E050007F010016F6","data":"foo1=619; foo3=foo5; foo6=33; foo7=foo8; foo9=1001; foo10=foo11; foo12=foo20; foo19=142,211,020","docId":"EF9A2796D8BC2F01E040600AB8002F81","foo21":"foo22","ts":1389341549845,"foo13":"619","docType":"foo23","operationType":"Create"}6E84E3C73E040600AB8003289","foo21":"64_613","ts":1389341548640,"foo13":"613","docType":"foo18","operationType":"Create"}51,"foo13":"603","docType":"foo17","operationType":"Modify"}

{"id":"121B5A8FE08E1F13E050007F010016F6","data":"foo1=619; foo3=foo5; foo6=33; foo7=foo8; foo9=0901; foo10=foo11; foo12=foo20; foo19=32,937","docId":"EF9A2796D8C02F01E040600AB8002F81","foo21":"foo22","ts":1389341549866,"foo13":"619","docType":"foo23","operationType":"Create"}ate"}6E84E3C73E040600AB8003289","foo21":"64_613","ts":1389341548640,"foo13":"613","docType":"foo18","operationType":"Create"}51,"foo13":"603","docType":"foo17","operationType":"Modify"}

{"id":"121B5A8FE08F1F13E050007F010016F6","data":"foo1=619; foo3=foo5; foo6=33; foo7=foo8; foo9=0202; foo10=foo39; foo12=foo20; foo19=80,000,000","docId":"EF9A2796D8C72F01E040600AB8002F81","foo21":"foo22","ts":1389341549895,"foo13":"619","docType":"foo23","operationType":"Create"}e":"Create"}ate"}6E84E3C73E040600AB8003289","foo21":"64_613","ts":1389341548640,"foo13":"613","docType":"foo18","operationType":"Create"}51,"foo13":"603","docType":"foo17","operationType":"Modify"}

{"id":"121B5A8FE0901F13E050007F010016F6","data":"foo1=619; foo3=foo5; foo6=M0; foo7=foo8; foo9=1001; foo10=foo11; foo12=foo20; foo19=142,211,020","docId":"EF9A2796D8CB2F01E040600AB8002F81","foo21":"foo22","ts":1389341549929,"foo13":"619","docType":"foo23","operationType":"Create"}6E84E3C73E040600AB8003289","foo21":"64_613","ts":1389341548640,"foo13":"613","docType":"foo18","operationType":"Create"}51,"foo13":"603","docType":"foo17","operationType":"Modify"}

As you can see, starting from the second JSON item, we got incorrect JSON with appended text after the closing bracket '}':

"51,"foo13":"603","docType":"foo17","operationType":"Modify"}"

(which is actually is a chunk of the tail of the first record).

It looks like there is some kind of byte buffer somewhere in mapreduce, which is used to read sequence file data, and it is not emptied after each line. And in case when the following line is shorter than the previous one, we get some chunks on old data.

Please, can anyone help me with this issue?

1 ACCEPTED SOLUTION

avatar
hide-solution

This problem has been solved!

Want to get a detailed solution you have to login/registered on the community

Register/Login
6 REPLIES 6

avatar
Contributor

My job is rather simple. It just reads the input and emits everything to output:

JobConf jobConf = new JobConf(getConf(), ArchiveMergeJob.class);
jobConf.setJobName(JOB_NAME);

Job job = Job.getInstance(jobConf);
job.setJarByClass(ArchiveMergeRunner.class);

SequenceFileInputFormat.addInputPath(job, new Path(args[0]));
job.setInputFormatClass(SequenceFileInputFormat.class);

job.setMapperClass(ArchiveMergeMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(ArchiveMergeReducer.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);

TextOutputFormat.setOutputPath(job, new Path(args[1]));

return job.waitForCompletion(true) ? 0 : 1;

avatar

@Dennis Fridlyand, would you please also share the code for the ArchiveMergeMapper and ArchiveMergeReducer classes?

avatar
Contributor

Hi, @Chris Nauroth

Here's implementation of mapper and reducer. As I've said, everything is trivial.

public class ArchiveMergeMapper extends Mapper<LongWritable, BytesWritable, Text, Text> {


  private final Logger log = Logger.getLogger(ArchiveMergeMapper.class);
  private Text outKey = new Text();


  @Override
  protected void map(LongWritable key, BytesWritable value,
      Mapper<LongWritable, BytesWritable, Text, Text>.Context context) throws IOException,
      InterruptedException {


    final String json = new String(value.getBytes(), "UTF-8");


    IMyInterface myObj = MyUtil.parseJson(json);
    if (myObj.getId() != null) {
      outKey.set(myObj.getId());
      context.write(outKey, new Text(json));
    } else {
      log.warn("Incorrect string" + json);
    }
  }
}
public class ArchiveMergeReducer extends Reducer<Text, Text, LongWritable, Text> {


  private LongWritable keyLW = new LongWritable(1);


  @Override
  protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, LongWritable, Text>.Context context)
      throws IOException, InterruptedException {


    if (values.iterator().hasNext()) {
      context.write(keyLW, values.iterator().next());
    }
  }
}

avatar
hide-solution

This problem has been solved!

Want to get a detailed solution you have to login/registered on the community

Register/Login

avatar
Contributor

@Chris Nauroth, thank you very much! You saved me a lot of time, and finally your solution have to solve my problem.

I had a feeling that the issue was with buffers, but I didn't guess that value may be reused in mapper...

avatar

@Dennis Fridlyand, I'm glad to hear this helped!