package com.kgi.test.mapreduce.hcatalog; import java.io.File; import java.io.IOException; import java.net.URI; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.hive.hcatalog.data.DefaultHCatRecord; import org.apache.hive.hcatalog.data.HCatRecord; import org.apache.hive.hcatalog.data.schema.HCatFieldSchema; import org.apache.hive.hcatalog.data.schema.HCatSchema; import org.apache.hive.hcatalog.mapreduce.HCatInputFormat; import org.apache.hive.hcatalog.mapreduce.HCatOutputFormat; import org.apache.hive.hcatalog.mapreduce.InputJobInfo; import org.apache.hive.hcatalog.mapreduce.OutputJobInfo; public class DynamicPartitionTest extends Configured implements Tool { public static class Map extends Mapper { protected void map( WritableComparable key, HCatRecord value, org.apache.hadoop.mapreduce.Mapper.Context context) throws IOException, InterruptedException { String id; int age; String encType; int appStatus; String val =""; id = (String) value.get(0); age = (Integer) value.get(1); encType = (String) value.get(2); if (value.get(3) !=null) { appStatus = (Integer) value.get(3); val = age + "|" + encType + "|" + appStatus; } else { val = age + "|" + encType + "|"; } System.out.println("key: " + id); System.out.print("val: " + val); context.write(new Text(id), new Text(val)); } } public static class Reduce extends Reducer { @Override protected void reduce( Text key, java.lang.Iterable values, org.apache.hadoop.mapreduce.Reducer.Context context) throws IOException, InterruptedException { for (Iterator iter = values.iterator(); iter.hasNext();) { String val = (iter.next()).toString(); String[] cols = val.split("\\|", -1); System.out.println ("col.length : " + cols.length); HCatRecord record = new DefaultHCatRecord(4); record.set(0, key.toString()); //id if (cols[0].length() >0 ) //age record.set(1, Integer.parseInt(cols[0])); else record.set(1, null); record.set(2, cols[1]); //enc_type_c if (cols[2].length() >0 ) //app_status_c record.set(3, Integer.parseInt(cols[2])); else record.set(3, null); context.write(null, record); } } } public int run(String[] args) throws Exception { Configuration conf = getConf(); args = new GenericOptionsParser(conf, args).getRemainingArgs(); String inputTableName = args[0]; String outputTableName = args[1]; String dbName = "test"; Job job = new Job(conf, "Dynamic Partition Test"); HCatInputFormat.setInput(job, dbName, inputTableName); // HCatInputFormat.setInput(job, // InputJobInfo.create(dbName,inputTableName, null, null)); // initialize HCatOutputFormat job.setInputFormatClass(HCatInputFormat.class); job.setJarByClass(DynamicPartitionTest.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(WritableComparable.class); job.setOutputValueClass(DefaultHCatRecord.class); List partitions = new ArrayList(); partitions.add(0, "partition0"); partitions.add(1, "partition1"); HCatFieldSchema partition0 = new HCatFieldSchema("enc_type_c", TypeInfoFactory.stringTypeInfo, null); HCatFieldSchema partition1 = new HCatFieldSchema("app_status_c", TypeInfoFactory.intTypeInfo, null); OutputJobInfo opJobInfo = OutputJobInfo.create(dbName, outputTableName, null); opJobInfo.setDynamicPartitioningKeys(partitions); // HCatMultipleInputs HCatOutputFormat.setOutput(job,opJobInfo); // HCatSchema s = HCatOutputFormat.getTableSchema(conf); // //.getTableSchema(job); HCatSchema s = HCatOutputFormat.getTableSchema(job.getConfiguration()); s.append(partition0); s.append(partition1); HCatOutputFormat.setSchema(job, s); job.setOutputFormatClass(HCatOutputFormat.class); return (job.waitForCompletion(true) ? 0 : 1); } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new DynamicPartitionTest(), args); System.exit(exitCode); } } /* * public class HcatalogTest { * * public static void main(String[] args) throws Exception { * * int result = ToolRunner.run(new Configuration(), new HcatalogTest(), args); * System.exit(result); } * * public int run(String[] args) throws Exception { * * Date dtStart = new Date(); * * Configuration conf = getConf(); * * String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); * if (otherArgs.length != 3) { * System.err.println("Usage: HcatalogTest "); int * idx = 0; for (String otherArg : otherArgs) { System.err.println("Argument " + * idx++ + " is [" + otherArg + "]."); } System.exit(2); } * * String countersPathname = "MemberAnalyzer.txt"; * * String inputPathName = otherArgs[0]; String param1 = otherArgs[1]; String * param2 = otherArgs[2]; * * * * // all the conf.set must be called before Job.getInstances Job job = * Job.getInstance(conf, "HCatalog Test"); * * String dbName = "test"; String[] tables = {"test_table_double_external_orc"}; * * List partitions = new ArrayList(); partitions.add(0, * "multiplier"); //partitions.add(1, "process_date"); * * HCatFieldSchema partition0 = new HCatFieldSchema("partition0", * TypeInfoFactory.stringTypeInfo, null); HCatFieldSchema partition1 = new * HCatFieldSchema("partition1", TypeInfoFactory.stringTypeInfo, null); * * job.setMapOutputKeyClass(LongWritable.class); * job.setMapOutputValueClass(Text.class); * * job.setMapOutputKeyClass(Text); job.setMapOutputValueClass(Text); * * * job.setReducerClass(MemberAnalyzerReducer.class); * job.setOutputKeyClass(NullWritable.class); * job.setOutputValueClass(Text.class); * job.setOutputFormatClass(TextOutputFormat.class); * * FileOutputFormat.setOutputPath(job, new Path(outputPathname)); * * job.setNumReduceTasks(2); * * for (String table : tables) { configurer.addOutputFormat(table, * HCatOutputFormat.class, BytesWritable.class, CatRecord.class); * * OutputJobInfo outputJobInfo = OutputJobInfo.create(dbName, table, null); * outputJobInfo.setDynamicPartitioningKeys(partitions); * * HCatOutputFormat.setOutput( configurer.getJob(table), outputJobInfo ); * * HCatSchema schema = * HCatOutputFormat.getTableSchema(configurer.getJob(table).getConfiguration()); * schema.append(partition0); schema.append(partition1); * * HCatOutputFormat.setSchema( configurer.getJob(table), schema ); } * configurer.configure(); * * return job.waitForCompletion(true) ? 0 : 1; * * job.waitForCompletion(true); * * System.out.println(""); System.out.println("Job Completed"); * System.out.println(""); * * return 0; } * * public static class HcatalogTestMapper extends Mapper { } * * public static class HcatalogTestReducer extends Reducer { private Text outputValue_; private NullWritable * nullWritable_; * * @Override public void reduce(Text key, Iterable values, Context * context) throws IOException, InterruptedException { * * context.progress(); * * outputValue_.set("This is a text value"); context.write(nullWritable_, * outputValue_); } } } */