Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

How can i get OrcInputFormat.setSearchArgument to work properly within my mapreduce java program?

How can i get OrcInputFormat.setSearchArgument to work properly within my mapreduce java program?

New Contributor

I'm working on a mapreduce written in Java that will run on a cluster of 6 machines. This mapreduce reads from an orc file passed as input, searches all the matches for a particular phone number within the "called" field and finally writes the result on hdfs. The orc file is a table of 50000 records ordered by the field "called". The schema of the orc file is: and the file is compressed with ZLIB. What i want to achieve here is to pass to the mappers only a small part of the 50000 records input using OrcInputFormat.setSearchArgument. Basically i want to execute a predicate pushdown on the input. The problem here is that setSearchArgument doesn't work as expected and the mappers work on all the 50000 records. How can i fix this?

package reader;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.Builder;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.orc.mapred.OrcStruct;
import org.apache.orc.mapreduce.OrcInputFormat;
import org.apache.orc.mapreduce.OrcOutputFormat;
import java.io.IOException;


public class Main extends Configured implements Tool 
{



public static class MyMapper extends Mapper<NullWritable, OrcStruct, LongWritable, Text>
{
private LongWritable one = new LongWritable(1);

@Override
protected void map(NullWritable key, OrcStruct value, Context context)
        throws IOException, InterruptedException 
{
    String temp = value.getFieldValue(6).toString();
    if(temp.equalsIgnoreCase("0111000000"))
    {
      Text word = new Text();
      word.set(temp);
      context.write(one, word);
    }
  }
}



public static class MyReducer extends Reducer<LongWritable, Text, LongWritable, IntWritable>
{  
  @SuppressWarnings("unused")
  public void reduce(LongWritable key, Iterable<Text> values, Context context)
          throws IOException, InterruptedException
      {   
         int counter = 0;
         for(Text txt:values) counter++;

         context.write(key, new IntWritable(counter));
      }
}



@Override
public int run(String[] args) throws Exception 
{
  Configuration conf = getConf();
  Job job = Job.getInstance(conf);

  Builder builder = SearchArgumentFactory.newBuilder();
  String[]columns = {"lenb","locdate","lochour","normdate",
          "normhour","caller","called","duration","hash","dates"};

  SearchArgument sarg = builder.startAnd().equals("called",PredicateLeaf.Type.STRING,
          "0111000000").end().build(); 

  OrcInputFormat.setSearchArgument(conf, sarg, columns);

  args = new GenericOptionsParser(conf, args).getRemainingArgs();

  job.setJarByClass(Main.class);
  job.setMapperClass(MyMapper.class);
  job.setReducerClass(MyReducer.class);
  job.setMapOutputKeyClass(LongWritable.class);
  job.setMapOutputValueClass(Text.class);

  job.setOutputKeyClass(LongWritable.class);
  job.setOutputValueClass(IntWritable.class);

  OrcInputFormat.addInputPath(job, new Path(args[0]));
  OrcOutputFormat.setOutputPath(job, new Path(args[1]));

  job.setInputFormatClass(OrcInputFormat.class);

  return job.waitForCompletion(true)? 0:1;      

}



public static void main(String[]args) throws Exception
{
  long time1 = System.currentTimeMillis();

  int exitCode = ToolRunner.run(new Main(), args);

  long time2 = System.currentTimeMillis();
  System.out.println("Total time = "+ (time2 - time1)/1000); 

  System.exit(exitCode);  
 }

}
Don't have an account?
Coming from Hortonworks? Activate your account here