Community Articles

Find and share helpful community-sourced technical articles.
Labels (1)
avatar
Expert Contributor

Often times, it requires us to use the plain Map-Reduce for some special custom use cases. There are many examples about how to use pure text format using Text File Input formats, but when coming to process ORC files, there aren't any. The Apache ORC website is a very good place to start with. I had a custom requirement to read data from a Hive ORC table and do some special processing which is quite difficult to do in Hive UDF. Below is the Map-Reduce code for that using all latest Hadoop 2.7.3 API

Driver Code:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.orc.OrcNewInputFormat;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


public class Main extends Configured implements Tool {
	public static void main(String[] args) throws Exception {
		
		int res =  ToolRunner.run(new Configuration(), new Main(),args);
		System.exit(res);
	}


	@Override
	public int run(String[] arg0) throws Exception {
		// TODO Auto-generated method stub
		
		Configuration conf = getConf();
		conf.set("mapreduce.job.inputformat.class", "org.apache.hadoop.hive.ql.io.orc.OrcNewInputFormat");
		conf.set("mapreduce.input.fileinputformat.inputdir", arg0[0]);
		conf.set("mapreduce.job.queuename", "platformeng");
		
		
		//Job job = new Job(conf,"Read ORC Files");
		Job job = Job.getInstance(conf,"Read ORC Files");
		job.setJarByClass(Main.class);
		job.setMapperClass(ORCReaderMapper.class);
		
		//job.setInputFormatClass(OrcInputFormat.class);
		
		job.setMapOutputKeyClass(NullWritable.class);
		job.setMapOutputValueClass(Text.class);
		
		//job.setOutputKeyClass(NullWritable.class);
		//job.setOutputValueClass(Text.class);
		
		job.setOutputFormatClass(TextOutputFormat.class);
		
		MultipleInputs.addInputPath(job, new Path(arg0[0]), OrcNewInputFormat.class);
		//FileInputFormat.addInputPath(job, new Path(arg0[0]));
		FileInputFormat.setInputDirRecursive(job, true);
		
		FileOutputFormat.setOutputPath(job, new Path(arg0[1]));
		job.setNumReduceTasks(0);
		System.exit(job.waitForCompletion(true) ?0:1);
		
		
		
		return 0;
	}


}


Mapper Code:

import java.io.IOException;
import java.util.List;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
import org.apache.hadoop.hive.serde2.io.DateWritable;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;


public class ORCReaderMapper extends
		Mapper<NullWritable, OrcStruct, NullWritable, Text> {


	FileSystem fs;
	TypeInfo typeInfo;
	ObjectInspector inspector;
	Text v;
	/*
	 * (non-Javadoc)
	 * 
	 * @see org.apache.hadoop.mapreduce.Mapper#map(java.lang.Object,
	 * java.lang.Object, org.apache.hadoop.mapreduce.Mapper.Context)
	 */
	@Override
	protected void map(NullWritable key, OrcStruct value,
			Mapper<NullWritable, OrcStruct, NullWritable, Text>.Context context)
			throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		
		inspector = value.createObjectInspector(typeInfo);
		StructObjectInspector structObjectInspector = (StructObjectInspector) inspector;
		List columnValues = structObjectInspector.getStructFieldsDataAsList(value);
		
		String fileName = columnValues.get(0).toString();
		DateWritable eventDate =(DateWritable) columnValues.get(1);
		
                // <Your custom logic with the key and value pairs>
                v.set(filename + "  "+ eventDate.toString())
                context.write(NullWritable.get(), v);
				
		
	}


	/* (non-Javadoc)
	 * @see org.apache.hadoop.mapreduce.Mapper#setup(org.apache.hadoop.mapreduce.Mapper.Context)
	 */
	@Override
	protected void setup(
			Mapper<NullWritable, OrcStruct, NullWritable, Text>.Context context)
			throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		
		typeInfo = TypeInfoUtils.getTypeInfoFromTypeString("struct<resource:string,evttime:date>");
		
		v = new Text();
		
		super.setup(context);
	}


}


The code is written using the mapreduce API and the deprecated mapred API is avoided. You may feel free to implement the data types of your choice. If you need a reducer phase, that should be straight forward as well.

Please note, the Split calculation is not straight forward as with text Input formats and are mostly driven by the number of ORC files in the directory. You might want to check the parallelism on the task that generates your source file to get more mapper parallelism in the map reduce code.

Below is some useful documentation:

https://orc.apache.org/docs/mapreduce.html

4,933 Views