Member since
06-14-2016
4
Posts
2
Kudos Received
0
Solutions
06-14-2016
01:18 AM
1 Kudo
I have many records store in my mongodb. Just looks like this: {
"_id": xxxx,
"url":"www.myurl.com/xxxxx",
"summary":"THIS IS DOCUMENT IN THE $url"
} What I want the result just looks like this: {
"_id": word_in_summary,
"results":[
{
"url": "the corresponding url that word appears in summary",
"count": "the total count of the word in the summary"
},
{ .... }
]
}
for example: {
"_id" : ObjectId("574ec02d8bee4f03d9174c11"),
"url" : "www.example_1.com/xxxx",
"summary" : "hello good, good"
}
{
"_id" : ObjectId("574ec02d8bee4f03d9174c12"),
"url" : "www.example_2.com/xxxx",
"summary" : "good"
}
So, the result is: {
"_id": "hello",
"results":[
"url": "www.example_1.com/xxxx",
"count": 1
]
}
{
"_id": "good",
"results":[
{
"url": "www.example_1.com/xxxx",
"count": 2
},
{
"url": "www.example_2.com/xxxx",
"count": 1
}
]
} I am a newbie in Java and Hadoop. My code to process the data is: import java.util.*;
import java.io.*;
import org.bson.*;
import com.mongodb.hadoop.MongoInputFormat;
import com.mongodb.hadoop.MongoOutputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
public class WordCount {
public static class TokenizerMapper extends Mapper<Object, BSONObject, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, BSONObject value, Context context )
throws IOException, InterruptedException {
String url = value.get("url").toString();
StringTokenizer itr = new StringTokenizer(value.get("summary").toString().replaceAll(",", " "));
while (itr.hasMoreTokens()) {
word.set(itr.nextToken() + " " + url);
context.write(word, one);
}
}
}
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, MapWritable> {
private MapWritable result = new MapWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
String str[] = key.toString().split(" ");
Text mykey= new Text(str[0]);
Text myurl = new Text(str[1]);
for (IntWritable val : values) {
sum += val.get();
}
System.out.println("sum : " + sum);
result.put(myurl, new IntWritable(sum));
context.write(mykey, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set( "mongo.input.uri" , "mongodb://localhost/testmr.stackin" );
conf.set( "mongo.output.uri" , "mongodb://localhost/testmr.stackout" );
@SuppressWarnings("deprecation")
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setInputFormatClass( MongoInputFormat.class );
job.setOutputFormatClass( MongoOutputFormat.class );
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
} The errors: 16/06/14 08:58:43 INFO mapred.MapTask: Ignoring exception during close for org.apache.hadoop.mapred.MapTask$NewOutputCollector@46544712
java.io.IOException: wrong value class: class org.apache.hadoop.io.MapWritable is not class org.apache.hadoop.io.IntWritable
at org.apache.hadoop.mapred.IFile$Writer.append(IFile.java:194)
at org.apache.hadoop.mapred.Task$CombineOutputCollector.collect(Task.java:1313)
at org.apache.hadoop.mapred.Task$NewCombinerRunner$OutputConverter.write(Task.java:1630)
at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.write(WrappedReducer.java:105)
at WordCount$IntSumReducer.reduce(WordCount.java:46)
at WordCount$IntSumReducer.reduce(WordCount.java:33)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
at org.apache.hadoop.mapred.Task$NewCombinerRunner.combine(Task.java:1651)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1630)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1482)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.close(MapTask.java:720)
at org.apache.hadoop.mapred.MapTask.closeQuietly(MapTask.java:2012)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:794)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
... View more
Labels:
- Labels:
-
Apache Hadoop