Support Questions
Find answers, ask questions, and share your expertise

Error: com.mongodb.BulkWriteException: Bulk write operation error on server

New Contributor
I am new to Hadoop and MongoDB, and I want to implement word count with web document data store in MongoDB.

My goal: get the word count of each word in each url

for example

{
    "_id" : ObjectId("574ec02d8bee4f03d9174c11"),
    "url" : "www.example_1.com/xxxx",
    "summary" : "hello good, good"
}

{
    "_id" : ObjectId("574ec02d8bee4f03d9174c12"),
    "url" : "www.example_2.com/xxxx",
    "summary" : "good"
}

So, the result is:

{
    "_id": "hello",
    "results":[
        "url": "www.example_1.com/xxxx",
        "count": 1
     ]
}

{
    "_id": "good",
    "results":[
        {
            "url": "www.example_1.com/xxxx",
            "count": 2
        },
        {
             "url": "www.example_2.com/xxxx",
             "count": 1
        }
     ]
}

Just show my code, it is simple.

public class WordCount {

    public static class TokenizerMapper extends Mapper<Object, BSONObject, Text, BSONWritable> {
        //private final static 
        private Text word = new Text();
        public void map(Object key, BSONObject value, Context context ) 
                                throws IOException, InterruptedException {
            String url = value.get("url").toString();
            StringTokenizer itr = new StringTokenizer(value.get("summary").toString().
                    replaceAll("\\p{Punct}|\\d","").replaceAll("\r\n", " ").replace("\r", " ").
                    replace("\n", " ").toLowerCase());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                BasicBSONObject urlCounts = new BasicBSONObject();
                urlCounts.put(url, 1);
                try {
                    context.write(word, new BSONWritable(urlCounts));
                } catch (Exception e) {
                    // TODO: handle exception
                    System.out.print("/t/t INFO: ");
                    e.printStackTrace();
                }

            }
        }
    }

    public static class IntSumReducer extends Reducer<Text, BSONWritable, Text, BSONWritable> {
        public void reduce(Text key, Iterable<BSONWritable> values, Context context)
            throws IOException, InterruptedException {
            HashMap<String, Integer> mymap = new HashMap<String, Integer>();
            BasicBSONObject result = new BasicBSONObject();
            BasicBSONObject urlcount = new BasicBSONObject();
            for (BSONWritable val : values) {
                @SuppressWarnings("unchecked")
                HashMap<String, Integer> temp = (HashMap<String, Integer>) val.getDoc().toMap();
                for (Map.Entry<String, Integer> entry :  temp.entrySet()) {
                    if (mymap.containsKey(entry.getKey())) {
                        mymap.put(entry.getKey(), entry.getValue()+1);
                    }
                    else {
                        mymap.put(entry.getKey(), 1);
                    }
                }
            }
            result.putAll(mymap);
            try {
                context.write(key, new BSONWritable(result));
            } catch (Exception e) {
                // TODO: handle exception
                System.out.print("/t/t INFO: ");
                e.printStackTrace();
            }   
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        conf.set( "mongo.input.uri" , "mongodb://192.168.1.102/stack.mrtest" );
        conf.set( "mongo.output.uri" , "mongodb://192.168.1.102/stack.mrtest_out3" );
        @SuppressWarnings("deprecation")
        Job job = new Job(conf, "word count");
        job.setNumReduceTasks(12);
        job.setJarByClass(WordCount.class);
        job.setMapperClass(TokenizerMapper.class);
        //job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(BSONWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(BSONWritable.class);
        job.setInputFormatClass( MongoInputFormat.class );
        job.setOutputFormatClass( MongoOutputFormat.class );
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

The code is simple. But when it runs on hadoop, it reports

Error: com.mongodb.BulkWriteException: Bulk write operation error on server 192.168.1.101:27017. Write errors: [com.mongodb.BulkWriteError@be979185].
        at com.mongodb.BulkWriteHelper.translateBulkWriteException(BulkWriteHelper.java:57)
        at com.mongodb.DBCollection.executeBulkWriteOperation(DBCollection.java:2202)
        at com.mongodb.DBCollection.executeBulkWriteOperation(DBCollection.java:2188)
        at com.mongodb.BulkWriteOperation.execute(BulkWriteOperation.java:121)
        at com.mongodb.hadoop.output.MongoOutputCommitter.commitTask(MongoOutputCommitter.java:176)
        at org.apache.hadoop.mapred.Task.commit(Task.java:1163)
        at org.apache.hadoop.mapred.Task.done(Task.java:1025)
        at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:397)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1656)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)

When the number of records is small, say 1k, it works fine. But when the number of records is very large, say 10million, it reports the above error in the reduce stage in MapReduce!

I think maybe there is a key alread exists in MongoDB, when a new record with the same key insert into MongoDB, it reports the error.

But, I don't think there is something wrong with my code. How should I fix it? Thank you very much!