Support Questions
Find answers, ask questions, and share your expertise

YARN/Map-Reduce: Different „Map input records“ with the same job

Highlighted

YARN/Map-Reduce: Different „Map input records“ with the same job

New Contributor

Hi Team, I am shocked. It is possible to get different results by running the same query and using the same data on YARN/Map-Reduce. I implemented filters to improve the performance. The job execution is successfully and there isn’t information about a bottleneck during the job execution. Could you help me to understand this behavior?

What can I do to solve this problem?

How I can be sure that the result is correct?

Now the details:

  • I’m using HDP-2.3.0.0-2557
  • I’m using HBase
  • I used the internal counters from the Map-Reduce Framework to evaluate the result Map input records

The following question should be answered: How many columns start with a defined prefix? The simple source code is below.

  • It is a full table scan
  • There is a QualifierFilter to reduce amount of data
  1. Result of the first run (with QualifierFilter): 16/09/14 07:43:13 Map-Reduce Framework Map input records= 33319329 de.kvs.mr.Counter COUNT = 33319329 --> The system and counter “COUNT” has the same value.
  2. Result of the second run (with QualifierFilter): 16/09/14 07:58:16 Map-Reduce Framework Map input records= 33363903 de.kvs.mr.Counter COUNT = 33363903 --> The system and counter “COUNT” has the same value. --> The amount of the first and second run isn’t the same!!!
  3. Result of the third run (without QualifierFilter): I removed the QualifierFilter 16/09/14 08:14:18 Map-Reduce Framework Map input records= 34455826 de.kvs.mr.Counter COUNT = 34455826 --> The system and counter “COUNT” has the same value.

The three Mappers gets a different count of rowKeys. I think there is a problem by using the filters.

MyCounterClass:

public class MyCounterClass implements Tool {
  @Override
  public int run(final String[] args) throws Exception {
    Configuration jobConf = getConf();
    jobConf.set("mapreduce.map.speculative", "false");
    final Job job = Job.getInstance(jobConf);
    job.setJobName(getClass().getSimpleName());
    job.setJarByClass(MyCounterClass.class);

    final Scan scan = new Scan();
    scan.addFamily(Bytes.toBytes("cf"));
    scan.setCaching(500);
    scan.setCacheBlocks(false);

    // relevant columns
    final QualifierFilter qualifierFilter =
        new QualifierFilter(CompareOp.EQUAL, 
		new RegexStringComparator("V_i(\\d+)|V_e\\d+)|V_k(\\d+)"));
    scan.setFilter(qualifierFilter);

    // Mapper
    TableMapReduceUtil.initTableMapperJob("myTable", scan, MyCounterMapper.class,
        ImmutableBytesWritable.class, IntWritable.class, job);
    job.setOutputFormatClass(NullOutputFormat.class);
    // no Reducer
    job.setNumReduceTasks(0);
    ...
  }

  public static void main(final String args[]) throws Exception {
    ToolRunner.run(new MyCounterClass(), args);
  }
}

MyCounterMapper:

public class MyCounterMapper extends TableMapper<ImmutableBytesWritable, IntWritable> {
@Override
  public void map(final ImmutableBytesWritable row, final Result result, final Context context)
      throws IOException, InterruptedException {
    context.getCounter(Counter.COUNT).increment(1);  }
}

Counter:

public enum Counter {
  COUNT;
}
3 REPLIES 3

Re: YARN/Map-Reduce: Different „Map input records“ with the same job

Contributor

Hi @Stefan Berner,

Could you please test the third run again (without QualifierFilter) and see if the result is the same? This will rule out the scan issue itself.

Also, could you please export all the rowkeys of the first and second run? Then make a diff to see what are new. Randomly select several of the new rowkeys, get all the cells of them from the hbase and see if the timestamps are really later than the first run or not.

Thanks

Highlighted

Re: YARN/Map-Reduce: Different „Map input records“ with the same job

New Contributor

Hi @Victor Xu,

thank you for your quick response.

The third run (without QualifierFilter) gets always the same result. I tried again.

Last week i had the same idea to export the rowkeys. The missing rowkeys are randomly between the first and the second run. But the missing rowkeys are concentrate on one range.

I have written a new Job with a scan to the range of the missing rowkeys. The result of this range (without and with QualifierFilter) is correct.

I started the run of the first, the second and the third again. The third configuration gets always the same results, but the first and the second are randomly different.

The data have not been changed! (The timestamps aren't later than the first run.)

Highlighted

Re: YARN/Map-Reduce: Different „Map input records“ with the same job

New Contributor

Hi Team,

i have made changes and more analyzes.

The problem description:

During this analysis there are no data modifications.

A Yarn-Job with a defined filter (QualifierFilter) has different results (Nr 5,6,7)!

The same Yarn-Job without a filter (QualifierFilter) has the same results (Nr. 1,2,11)!

The job execution is successfully and there isn’t information about a bottleneck during the job execution.

The solution is to increase the mapreduce.map.memory.mb from 2048 to 3072.

The same Yarn-Job with a defined filter (QualifierFilter) and the setting mapreduce.map.memory.mb=3072 has the same results (Nr 3,4,8,9,10)!

The default settings for the parameter mapreduce.map.memory.mb is 2048.

There is a bottleneck (RAM) during the job execution with a filter?

How can i detect this bottleneck to prevent wrong results?

Please, can anyone help me?

Thanks

Here are the code and the case studies.

MrCount:

public class MrCount extends Configured implements Tool {
  private static final byte[] COLUMN_FAMILY = Bytes.toBytes("cf");
  private static final String TABLE = "myTable";
  private static final String QUALIFIER_REGEX = "V_iid_V";

  @Override
  public int run(final String[] args) throws Exception {
    boolean filter = Integer.valueOf(args[0]) == 1 ? true : false;
    Configuration jobConf = getConf();
    jobConf.set("mapreduce.map.speculative", "false");


    // For different settings.
    jobConf.set("mapreduce.map.memory.mb", "2048");
    //jobConf.set("mapreduce.map.memory.mb", "3072");

    final Job job = Job.getInstance(jobConf);

    String jobname;
    if (filter) {
      jobname = getClass().getSimpleName() + " with Filter";
    } else {
      jobname = getClass().getSimpleName() + " without Filter";
    }

    job.setJobName(jobname);
    job.setJarByClass(MrCount.class);
    log.info("########### " + jobname + " ############");
    final Scan scan = new Scan();
    scan.addFamily(COLUMN_FAMILY);
    scan.setCaching(500);
    scan.setCacheBlocks(false);

    if (filter) {
      scan.setFilter(
          new QualifierFilter(CompareOp.EQUAL, new RegexStringComparator(QUALIFIER_REGEX)));
    }
    // Mapper
    TableMapReduceUtil.initTableMapperJob(TABLE, scan, CountMapper.class,
        ImmutableBytesWritable.class, IntWritable.class, job);
    job.setOutputFormatClass(NullOutputFormat.class);
    job.setNumReduceTasks(0);
    TableMapReduceUtil.addDependencyJars(job);
    final int result = job.waitForCompletion(true) ? 0 : 1;
    return result;
  }

  public static void main(final String args[]) throws Exception {
    ToolRunner.run(new MrCount(), args);
  }
}

CountMapper:

public class CountMapper extends TableMapper<ImmutableBytesWritable, IntWritable> {
  private static final byte[] COLUMN_FAMILY = Bytes.toBytes("cf");

  @Override
  public void map(final ImmutableBytesWritable row, final Result result, final Context context)
      throws IOException, InterruptedException {

    context.getCounter(Counter.COUNT_ROWKEY).increment(1);

    for (final Map.Entry<byte[], byte[]> column : result.getFamilyMap(COLUMN_FAMILY).entrySet()) {
      final String qualifier = Bytes.toString(column.getKey());

      if (qualifier.startsWith("V_iid_V")) {
        context.getCounter(Counter.COUNT_V_iid_V).increment(1);
      }
    }
  }
}

Counter:

public enum Counter {
  COUNT_ROWKEY, COUNT_V_iid_V;
}

case studies:

Nr Job-Description Execution time - Jobname

Setting for mapreduce.map.memory.mb

Map input records COUNT_ROWKEY COUNT_V_iid_V result is correct
1

16/10/25 06:29:03 MrCount without Filter mapreduce.map.memory.mb=3072

34455826

34455826

33462943

x
2

16/10/25 06:40:50 MrCount without Filter mapreduce.map.memory.mb=3072

34455826

34455826

33462943

x
3

16/10/25 06:50:00 MrCount with Filter mapreduce.map.memory.mb=3072

33431713

33431713

33462943

x
4

16/10/25 07:01:46 MrCount with Filter mapreduce.map.memory.mb=3072

33431713

33431713

33462943

x
5

16/10/25 07:37:06 MrCount with Filter mapreduce.map.memory.mb=2048

33387139

33387139

33418350

6

16/10/25 08:08:58 MrCount with Filter mapreduce.map.memory.mb=2048

33347191

33347191

33378388

7

16/10/25 08:18:31 MrCount with Filter mapreduce.map.memory.mb=2048

33319329

33319329

33350485

8

16/10/25 08:29:57 MrCount with Filter mapreduce.map.memory.mb=3072

33431713

33431713

33462943

x
9

16/10/25 08:46:29 MrCount with Filter mapreduce.map.memory.mb=3072

33431713

33431713

33462943

x
10

16/10/25 08:56:38 MrCount with Filter mapreduce.map.memory.mb=3072

33431713

33431713

33462943

x
11

16/10/25 09:03:43 MrCount without Filter mapreduce.map.memory.mb=3072

34455826

34455826

33462943

x