Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

How to save Mapreduce's Reducer output without Key,Value pair ?

Highlighted

How to save Mapreduce's Reducer output without Key,Value pair ?

New Contributor

I am writing a Mapreduce program to process Dicom images.

The purpose of this Mapreduce program is to process the dicom image, extract metadata from it, index to solr and finally in Reducer phase it should save the raw image in hdfs.

I want to save the same file in HDFS as a reducer output

So I have achieved most of the functionality, but in reducer phase when storing the same file in hdfs it is not working.

I have tested the processed Dicom file with a dicom image viewer and it says the file is curropted and also the size of processed dicom file is slightly increase. **Ex.** Original Dicom size is 628Kb and when reducer save this file in hdfs it size changes to 630Kb.

I have tried solution from these links but none of them give the expected results.

Here is the code for Reading Dicom file as a single file (without splitting it).


    public class WholeFileInputFormat extends FileInputFormat<NullWritable, BytesWritable>{     

        @Override
        protected boolean isSplitable(JobContext context, Path filename) {
            return false;
        }

        @Override
        public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
                throws IOException, InterruptedException {
            WholeFileRecordReader reader = new WholeFileRecordReader();
            reader.initialize(split, context);
            return reader;
        }        
    }


Custom RecordReader


    public class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable>{
    
        private FileSplit fileSplit;
        private Configuration conf;
        private BytesWritable value = new BytesWritable();
        private boolean processed = false;
        
        @Override
        public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {        
            this.fileSplit = (FileSplit) split;
            this.conf = context.getConfiguration();        
        }
        
        @Override
        public boolean nextKeyValue() throws IOException, InterruptedException {
            if (!processed) {
                byte[] contents = new byte[(int) fileSplit.getLength()];
                System.out.println("Inside nextKeyvalue");
                System.out.println(fileSplit.getLength());
                Path file = fileSplit.getPath();
                FileSystem fs = file.getFileSystem(conf);
                FSDataInputStream in = null;
                try {
                    in = fs.open(file);
                    IOUtils.readFully(in, contents, 0, contents.length);
                    value.set(contents, 0, contents.length);
                } finally {
                    IOUtils.closeStream(in);
                }
                    processed = true;
                    return true;
                }
                return false;
        }
        
        @Override
        public void close() throws IOException {
            
        }
    
        @Override
        public NullWritable getCurrentKey() throws IOException, InterruptedException 
        {
            return NullWritable.get();
        }
    
        @Override
        public BytesWritable getCurrentValue() throws IOException, InterruptedException {
            return value;
        }
    
        @Override
        public float getProgress() throws IOException, InterruptedException {
            return processed ? 1.0f : 0.0f;
        }
        
    }


Mapper Class

The mapper class working perfectly as per our need.


    public class MapClass{
    
        public static class Map extends Mapper<NullWritable, BytesWritable, Text, BytesWritable>{    
        
            @Override
            protected void map(NullWritable key, BytesWritable value,
                    Mapper<NullWritable, BytesWritable, Text, BytesWritable>.Context context)
                    throws IOException, InterruptedException {
                value.setCapacity(value.getLength());
                InputStream in = new ByteArrayInputStream(value.getBytes());            
                ProcessDicom.metadata(in); // Process dicom image and extract metadata from it
                Text keyOut = getFileName(context);
                context.write(keyOut, value);
    
            }
        
            private Text getFileName(Mapper<NullWritable, BytesWritable, Text, BytesWritable>.Context context)
            {
                InputSplit spl = context.getInputSplit();
                Path filePath = ((FileSplit)spl).getPath();
                String fileName = filePath.getName();
                Text text = new Text(fileName);
                return text;
            }
            
            @Override
            protected void setup(Mapper<NullWritable, BytesWritable, Text, BytesWritable>.Context context)
                    throws IOException, InterruptedException {
                super.setup(context);
            }
        
        }


Reducer Class

This is the reducer class.

    public class ReduceClass{    
        public static class Reduce extends Reducer<Text, BytesWritable, BytesWritable, BytesWritable>{
    
            @Override
                protected void reduce(Text key, Iterable<BytesWritable> value,
                        Reducer<Text, BytesWritable, BytesWritable, BytesWritable>.Context context)
                        throws IOException, InterruptedException {
    
                Iterator<BytesWritable> itr = value.iterator();
                while(itr.hasNext())
                {
                    BytesWritable wr = itr.next();
                    wr.setCapacity(wr.getLength());
                    context.write(new BytesWritable(key.copyBytes()), itr.next());
                }
            }
    }


Main Class


    public class DicomIndexer{
    
        public static void main(String[] argss) throws Exception{
            String args[] = {"file:///home/b3ds/storage/dd","hdfs://192.168.38.68:8020/output"};
            run(args);
        }
        
        public static void run(String[] args) throws Exception {
            
            //Initialize the Hadoop job and set the jar as well as the name of the Job
            Configuration conf = new Configuration();
            Job job = new Job(conf, "WordCount");
            job.setJarByClass(WordCount.class);
    //        job.getConfiguration().set("mapreduce.output.basename", "hi");
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(BytesWritable.class);
            job.setOutputKeyClass(BytesWritable.class);
            job.setOutputValueClass(BytesWritable.class);
            
            job.setMapperClass(Map.class);
            job.setCombinerClass(Reduce.class);
            job.setReducerClass(Reduce.class);
            job.setInputFormatClass(WholeFileInputFormat.class);
            job.setOutputFormatClass(SequenceFileOutputFormat.class);
                    
            WholeFileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            
            job.waitForCompletion(true);
            
        }
        
    }

So I completely clueless what do to do. Some of the link says it is not possible as Mapreduce works on <Key,value> pair and some says to use NullWritable. So far I have tried NullWritable, SequenceFileOutputFormat, but none of them working.