Member since 
    
	
		
		
		06-14-2016
	
	
	
	
	
	
	
	
	
	
	
	
	
	
			
      
                4
            
            
                Posts
            
        
                2
            
            
                Kudos Received
            
        
                0
            
            
                Solutions
            
        
			
    
	
		
		
		06-14-2016
	
		
		01:18 AM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
	
		1 Kudo
		
	
				
		
	
		
					
							 I have many records store in my mongodb. Just looks like this:  {
 "_id": xxxx,
  "url":"www.myurl.com/xxxxx",
  "summary":"THIS IS DOCUMENT IN THE $url"
}      What I want the result just looks like this:  {
    "_id": word_in_summary,
    "results":[
        { 
            "url": "the corresponding url that word appears in summary",
            "count": "the total count of the word in the summary"
        },
        { .... }
     ]
}
  for example:  {
    "_id" : ObjectId("574ec02d8bee4f03d9174c11"),
    "url" : "www.example_1.com/xxxx",
    "summary" : "hello good, good"
}
{
    "_id" : ObjectId("574ec02d8bee4f03d9174c12"),
    "url" : "www.example_2.com/xxxx",
    "summary" : "good"
}
  So, the result is:  {
    "_id": "hello",
    "results":[
        "url": "www.example_1.com/xxxx",
        "count": 1
     ]
}
{
    "_id": "good",
    "results":[
        {
            "url": "www.example_1.com/xxxx",
            "count": 2
        },
        {
             "url": "www.example_2.com/xxxx",
             "count": 1
        }
     ]
}  I am a newbie in Java and Hadoop. My code to process the data is:  import java.util.*; 
import java.io.*;
import org.bson.*;
import com.mongodb.hadoop.MongoInputFormat;
import com.mongodb.hadoop.MongoOutputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
public class WordCount {
    public static class TokenizerMapper extends Mapper<Object, BSONObject, Text, IntWritable> {
    	private final static IntWritable one = new IntWritable(1);
    	private Text word = new Text();
    	public void map(Object key, BSONObject value, Context context ) 
    			throws IOException, InterruptedException {
    		String url = value.get("url").toString();
    		StringTokenizer itr = new StringTokenizer(value.get("summary").toString().replaceAll(",", " "));
    		while (itr.hasMoreTokens()) {
    			word.set(itr.nextToken() + " " + url);
    			context.write(word, one);
    		}
    	}
    }
    public static class IntSumReducer extends Reducer<Text, IntWritable, Text, MapWritable> {
    	private MapWritable result = new MapWritable();
    	public void reduce(Text key, Iterable<IntWritable> values, Context context)
    		throws IOException, InterruptedException {
    		int sum = 0;
    		String str[] = key.toString().split(" ");
    		Text mykey= new Text(str[0]);
    		Text myurl = new Text(str[1]);
    		for (IntWritable val : values) {
    			sum += val.get();
    		}
    		System.out.println("sum : " + sum);
    		result.put(myurl, new IntWritable(sum));
    		context.write(mykey, result);
    	}
    }
    public static void main(String[] args) throws Exception {
    	Configuration conf = new Configuration();
    	conf.set( "mongo.input.uri" , "mongodb://localhost/testmr.stackin" );
    	conf.set( "mongo.output.uri" , "mongodb://localhost/testmr.stackout" );
    	@SuppressWarnings("deprecation")
		Job job = new Job(conf, "word count");
    	job.setJarByClass(WordCount.class);
    	job.setMapperClass(TokenizerMapper.class);
    	job.setCombinerClass(IntSumReducer.class);
    	job.setReducerClass(IntSumReducer.class);
    	job.setOutputKeyClass(Text.class);
    	job.setOutputValueClass(IntWritable.class);
    	job.setInputFormatClass( MongoInputFormat.class );
    	job.setOutputFormatClass( MongoOutputFormat.class );
    	System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}  The errors:  16/06/14 08:58:43 INFO mapred.MapTask: Ignoring exception during close for org.apache.hadoop.mapred.MapTask$NewOutputCollector@46544712
java.io.IOException: wrong value class: class org.apache.hadoop.io.MapWritable is not class org.apache.hadoop.io.IntWritable
	at org.apache.hadoop.mapred.IFile$Writer.append(IFile.java:194)
	at org.apache.hadoop.mapred.Task$CombineOutputCollector.collect(Task.java:1313)
	at org.apache.hadoop.mapred.Task$NewCombinerRunner$OutputConverter.write(Task.java:1630)
	at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
	at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.write(WrappedReducer.java:105)
	at WordCount$IntSumReducer.reduce(WordCount.java:46)
	at WordCount$IntSumReducer.reduce(WordCount.java:33)
	at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
	at org.apache.hadoop.mapred.Task$NewCombinerRunner.combine(Task.java:1651)
	at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1630)
	at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1482)
	at org.apache.hadoop.mapred.MapTask$NewOutputCollector.close(MapTask.java:720)
	at org.apache.hadoop.mapred.MapTask.closeQuietly(MapTask.java:2012)
	at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:794)
	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
	at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
		
			
				
						
							Labels:
						
						
		
			
	
					
			
		
	
	
	
	
				
		
	
	
- Labels:
 - 
						
							
		
			Apache Hadoop