Reply
Highlighted
New Contributor
Posts: 3
Registered: ‎02-15-2019

Hadoop MapReduce wrong result

Hello, i created a Hadoop MapReduce program in ubuntu. I tested my program on Hadoop localhost and it worked correctly. But when i tried to run it on a 5 machine cluster in cloudera AWS the result was wrong. Can someone help me please
New Contributor
Posts: 1
Registered: ‎02-16-2019

Re: Hadoop MapReduce wrong result

Hello myriamj, Your problem's reason can be result of a lot of situations so that you should more details about your code and results. On the other hand if your code run successfully on single cluster you should pay attention about static variables , because your program can't run same JVM on multicluster system so that your static variables can't reachable on another cluster. However if you use single cluster system your code will work successfully because when your program ran on single cluster your mappers will work on same JVM and static fields can be reach. Maybe your solution can be that but you should more detail about your code.

New Contributor
Posts: 3
Registered: ‎02-15-2019

Re: Hadoop MapReduce wrong result

[ Edited ]

Thank you for your response. Here is my code can you help me please. (the input file looks like:

1 0 2:3:
2 10000 1:4:5:
3 10000 1:
4 10000 2:5:
5 10000 2:4:

)



import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.BufferedReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.Scanner;

public class DGC_m4_random extends Configured implements Tool {

    public static String OUT = "output";
    public static String IN = "inputlarger";
    public static String graph = "";
    public static int nbItérations = 2;
    public static String exeName = "";
    private static String outputName;

    public static class DijkstraMapper extends Mapper<LongWritable, Text, LongWritable, Text> {

        @Override
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            // Key is node n
            // Value is D, Points-To
            // For every point (or key), look at everything it points to.
            // Emit or write to the points to variable with the current distance
            // + 1


            Text word = new Text();
            String line = value.toString();// looks like 1 0 2:3:
            String[] sp = line.split(" ");// splits on space
            int distanceAdded = Integer.parseInt(sp[1]) + 1;
            System.out.println("sp[2]..." + line + sp);
            String[] pointsTo = sp[2].split(":");
            for (String distance : pointsTo) {
                word.set("VALUE " + distanceAdded);// tells me to look at
                                                    // distance value
                context.write(new LongWritable(Integer.parseInt(distance)), word);
                word.clear();
            }

            // pass in current node's distance (if it is the lowest distance)
            word.set("VALUE " + sp[1]);
            context.write(new LongWritable(Integer.parseInt(sp[0])), word);
            word.clear();
            word.set("NODES " + sp[2]);// tells me to append on the final tally
            context.write(new LongWritable(Integer.parseInt(sp[0])), word);
            word.clear();

        }
    }

    public static class DijkstraReducer extends Reducer<LongWritable, Text, LongWritable, Text> {
        @Override
        public void reduce(LongWritable key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {

            // From slide 20 of Graph Algorithms with MapReduce (by Jimmy Lin,
            // Univ @ Maryland)
            // The key is the current point
            // The values are all the possible distances to this point
            // we simply emit the point and the minimum distance value

            String nodes = "UNMODED";
            Text word = new Text();
            int lowest = 10009;// start at infinity
          
            for (Text val : values) {// looks like NODES/VALUES 1 0 2:3:, we
                                        // need to use the first as a key
                String[] sp = val.toString().split(" ");// splits on space
                // look at first value
                if (sp[0].equalsIgnoreCase("NODES")) {
                    nodes = null;
                    nodes = sp[1];
                } else if (sp[0].equalsIgnoreCase("VALUE")) {
                    int distance = Integer.parseInt(sp[1]);
                    lowest = Math.min(distance, lowest);
                }
            }
            word.set(lowest + " " + nodes);
            context.write(key, word);
            word.clear();
        }
    }

    // Almost exactly from
    // http://hadoop.apache.org/mapreduce/docs/current/mapred_tutorial.html
    @Override
    public int run(String[] args) throws Exception {
        // http://code.google.com/p/joycrawler/source/browse/NetflixChallenge/src/org/niubility/learning/knn/KN...
        // make the key -> value space separated (for iterations)
        getConf().set("mapred.textoutputformat.separator", " ");

        // set in and out to args.
        boolean isdone = false;
        boolean success = false;
        boolean modIsImproved = true;
        float prevMod = 0;
        float mod = 0;


        IN = args[0];
        graph = args[0];

        int totalEdges = 0;
        
        Path ddir = new Path(exeName);
        FileSystem dfs = FileSystem.get(getConf());
        dfs.delete(ddir, true);

        try {
            totalEdges = calculerTotalEdge(IN);
            System.out.println("totalEdges ....>" + totalEdges);
        } catch (Exception e) {
            System.out.println("Exception ....>" + e);
        }

        int num = 1;
        ArrayList<String> lstGr = new ArrayList<>();
        ArrayList<String> newlstGr = new ArrayList<>();
        lstGr.add(IN);

        int niv = 0;

        int i = 1;
        String text_mod = "";

        while (nbItérations >= i) {

            System.out.println("****************************** niveau " + niv + "****************************");

            prevMod = mod;
            mod = 0;
            num = 1;
            newlstGr.clear();

            for (String gr : lstGr) {

                args[0] = gr;
                IN = args[0];
                Object[] tab = diviser(totalEdges, args, num, niv);
                mod = mod + (float) tab[0];
                newlstGr.add((String) tab[1]);
                newlstGr.add((String) tab[2]);
                num = num + 2;

            }
            

            for (String gr : newlstGr) {
                for (String g : newlstGr) {

                    if (!g.equals(gr)) {

                        Integer n = getNBOver(graph, gr, g);
                        mod = mod - (n * n) / (totalEdges * totalEdges);

                    }

                }

            }

            lstGr = newlstGr;
            newlstGr = new ArrayList<>();

            System.out.println("mod : " + mod);
            System.out.println("prevMod: " + prevMod);

            text_mod = text_mod + i + " : Itération " + "------>  mod = " + mod + "\n";
            i++;

            if (mod <= prevMod)
            {
                modIsImproved = false;
                
                
            }
            
            

            niv++;

        }

        System.out.println("liste des mod : \n" + text_mod);
        FileSystem fs = FileSystem.get(new Configuration());
        FSDataOutputStream out2 = fs.create(new Path(exeName+"/resultat"+"/mod"));
        out2.write(text_mod.getBytes());
        out2.close();
        
        
        for (String g : lstGr) {
            
            String nom = g.split("out")[1];
            
            copyFile(g, exeName+"/resultat"+"/graphs/"+nom);    
        }

        return success ? 0 : 1;
    }

    
    
    Object[] diviser(int totalEdges, String[] args, int num, int niv) throws Exception {

        Object[] tab = new Object[3];

    //    int LongestDistance = -1;
        int LongestNd = -1;
        int LongestNs = -1;
//
         int LigneLongestNd = -1;
        int LigneLongestNs = -1;

        FileSystem fsi = FileSystem.get(new Configuration());
        String copy = outputName+"/IN.txt";
        copyFile(IN, copy);

        BufferedReader bri = new BufferedReader(new InputStreamReader(fsi.open(new Path(copy))));
        String linei = bri.readLine();

        int lineIndex = 0;
       ArrayList<Integer> lstN = new ArrayList<>();
        while (linei != null) {

            // each line looks like 0 1 2:3:
            String[] spi = linei.split(" ");
            int nodei = Integer.parseInt(spi[0]);
            linei = bri.readLine();

            
            lstN.add(nodei);
            

        } // fin calcul

        int r1;
        int r2;
        
        do{
             r1= (int) ((Math.random()*(lstN.size()-1)));
             r2= (int) ((Math.random()*(lstN.size()-1)));
        }
        
        while(r1==r2 );
            
        
        LongestNd=lstN.get(r1);
        LongestNs=lstN.get(r2);
        
        LigneLongestNd=r1+1;
        LigneLongestNs=r2+1;
            
        
        
        System.out.println("--------------- Final -----------------------");
        System.out.println("LigneLongestNs: " + LigneLongestNs);
        System.out.println("LigneLongestNd: " + LigneLongestNd);

    

        bri.close();
        // fsi.close();

        System.out.println("--------------------------------------");

        updateFile(LigneLongestNs, IN, "10000");
        LinkedHashMap<Integer, Integer> _maps = dijkstra(args);

        updateFile(LigneLongestNd, IN, "10000");
        LinkedHashMap<Integer, Integer> _mapd = dijkstra(args);

        ArrayList<Integer> ns = new ArrayList<>();
        ArrayList<Integer> nd = new ArrayList<>();

        for (Object n : _maps.keySet()) {

            if (_maps.get(n) > _mapd.get(n)) {
                nd.add((Integer) n);
            } else // if (_maps.get(n) < _mapd.get(n))
                ns.add((Integer) n);
            // else {
            // if (Math.random() > 0.5)
            // nd.add((Integer) n);
            // else
            // ns.add((Integer) n);
            //
            // }
        }
        Integer[] t = writeSgFile(IN, outputName +"/sg_" + niv + "_" + num + ".txt", outputName +"/sg_" + niv + "_" + (num + 1) + ".txt", ns, nd,
                "10000");
        float modularity = 0;
        // modularity = (float) t[0] / totalEdges - ((float) t[2] / totalEdges)
        // * ((float) t[2] / totalEdges)
        // + (float) t[1] / totalEdges - ((float) t[3] / totalEdges) * ((float)
        // t[3] / totalEdges);
        modularity = (float) t[2] / totalEdges + (float) t[3] / totalEdges;

        tab[0] = modularity;
        tab[1] = outputName +"/sg_" + niv + "_" + num + ".txt";
        tab[2] = outputName +"/sg_" + niv + "_" + (num + 1) + ".txt";

        return tab;
    }

    private int calculerTotalEdge(String IN) throws IOException {
        FileSystem fsi = FileSystem.get(new Configuration());
        BufferedReader bri = new BufferedReader(new InputStreamReader(fsi.open(new Path(IN))));
        String linei = bri.readLine();
        int nbEdge = 0;

        ArrayList<Integer> nods = new ArrayList<Integer>();

        while (linei != null) {

            // each line looks like 0 1 2:3:
            String[] spi = linei.split(" ");

            int nodei = Integer.parseInt(spi[0]);

            for (int i = 0; i < spi[2].split(":").length; i++) {

                if (!nods.contains(Integer.parseInt(spi[2].split(":")[i])))

                    nbEdge++;

            }

            nods.add(nodei);
            linei = bri.readLine();

        }

        bri.close();
        // fsi.close();

        return nbEdge;
    }

    public LinkedHashMap<Integer, Integer> dijkstra(String[] args) throws Exception {

        IN = args[0];
        OUT = args[1];

        String infile = IN;
        String outputfile = OUT + System.nanoTime();

        boolean isdone = false;
        boolean success = false;

        LinkedHashMap<Integer, Integer> _map = new LinkedHashMap<Integer, Integer>();

        ///////////////////////////////

        while (!isdone) {

            Job job = new Job(getConf(), "Dijkstra");
            job.setJarByClass(DGC_m4_random.class);
            job.setOutputKeyClass(LongWritable.class);
            job.setOutputValueClass(Text.class);
            job.setMapperClass(DijkstraMapper.class);
            job.setReducerClass(DijkstraReducer.class);
            job.setInputFormatClass(TextInputFormat.class);
            job.setOutputFormatClass(TextOutputFormat.class);

            FileInputFormat.addInputPath(job, new Path(infile));
            FileOutputFormat.setOutputPath(job, new Path(outputfile));

            success = job.waitForCompletion(true);

            // remove the input file
            // http://eclipse.sys-con.com/node/1287801/mobile

            if (!infile.equals(IN)) {

                String indir = infile.replace("part-r-00000", "");
                Path ddir = new Path(indir);
                FileSystem dfs = FileSystem.get(getConf());
                dfs.delete(ddir, true);
            }

            infile = outputfile + "/part-r-00000";
            outputfile = OUT + System.nanoTime();

            // do we need to re-run the job with the new input file??
            // http://www.hadoop-blog.com/2010/11/how-to-read-file-from-hdfs-in-hadoop.html
            isdone = true;// set the job to NOT run again!
            Path ofile = new Path(infile);
            FileSystem fs = FileSystem.get(new Configuration());
            BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(ofile)));

            LinkedHashMap<Integer, Integer> imap = new LinkedHashMap<Integer, Integer>();
            String line = br.readLine();
            while (line != null) {
                // each line looks like 0 1 2:3:
                // we need to verify node -> distance doesn't change
                String[] sp = line.split(" ");
                int node = Integer.parseInt(sp[0]);
                int distance = Integer.parseInt(sp[1]);
                imap.put(node, distance);
                line = br.readLine();
            }

            if (_map.isEmpty()) {
                // first iteration... must do a second iteration regardless!
                isdone = false;
            } else {
                // http://www.java-examples.com/iterate-through-values-java-hashmap-example
                // http://www.javabeat.net/articles/33-generics-in-java-50-1.html
                for (Integer key : imap.keySet()) {
                    int val = imap.get(key);
                    if (_map.get(key) != val) {
                        // values aren't the same... we aren't at
                        // convergence yet
                        isdone = false;
                    }
                }
            }

            if (!isdone) {
                _map.putAll(imap);// copy imap to _map for the next
                                    // iteration (if required)
            }

            br.close();
            // fs.close();
        }

        return _map;
    }

    public static void updateFile(int lineIndex, String in, String inf) {
        try {
            Path p = new Path(in);
            FileSystem fs = FileSystem.get(new Configuration());
            BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(p)));

            String line = "", text = "";
            int i = 0;

            while ((line = reader.readLine()) != null) {

                if (i == lineIndex - 1) {
                    String[] sp = line.split(" ");
                    line = sp[0] + " " + inf + " " + sp[2];
                    text += line + "\r\n";
                } else if (i == lineIndex) {

                    String[] sp = line.split(" ");
                    line = sp[0] + " " + 0 + " " + sp[2];
                    text += line + "\r\n";

                } else {
                    text += line + "\r\n";
                }

                i++;
            }
            reader.close();

            // FileWriter writer = new FileWriter(in);
            // writer.write(text);
            // writer.close();

            FSDataOutputStream out2 = fs.create(new Path(in));
            out2.write(text.getBytes());
            out2.close();

            // fs.close();

        } catch (IOException ioe) {
            ioe.printStackTrace();
        }
    }

    public static void copyFile(String in, String out) {
        try {
            Path p = new Path(in);
            FileSystem fs = FileSystem.get(new Configuration());
            BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(p)));

            String line = "", text = "";

            while ((line = reader.readLine()) != null) {

                text += line + "\r\n";

            }
            reader.close();

            FSDataOutputStream out2 = fs.create(new Path(out));
            out2.write(text.getBytes());
            out2.close();

            // fs.close();

        } catch (IOException ioe) {
            ioe.printStackTrace();
        }
    }

    
    public static Integer getNBOver(String g, String sg1, String sg2) {

        Integer nbNOver = 0;
        try {

            FileSystem fs = FileSystem.get(new Configuration());

            Path p1 = new Path(sg1);
            BufferedReader reader1 = new BufferedReader(new InputStreamReader(fs.open(p1)));

            String line1 = "";

            while ((line1 = reader1.readLine()) != null) {

                String[] sp1 = line1.split(" ");
                Integer n1 = Integer.parseInt(sp1[0]);

                Path p2 = new Path(sg2);
                BufferedReader reader2 = new BufferedReader(new InputStreamReader(fs.open(p2)));

                String line2 = "";

                while ((line2 = reader2.readLine()) != null) {

                    String[] sp2 = line2.split(" ");
                    Integer n2 = Integer.parseInt(sp2[0]);

                    Path p = new Path(g);
                    BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(p)));

                    String line = "";

                    boolean fin = false;
                    while ((line = reader.readLine()) != null && !fin) {

                        String[] sp = line.split(" ");
                        Integer n = Integer.parseInt(sp[0]);

                        if (n == n1) {
                            if (line.contains(":" + n2 + ":") || line.contains(" " + n2 + ":")) {
                                fin = true;
                                nbNOver++;
                            }
                        } else if (n == n2) {
                            if (line.contains(":" + n1 + ":") || line.contains(" " + n1 + ":")) {
                                fin = true;
                                nbNOver++;
                            }
                        }

                    }
                    reader.close();

                }
                reader2.close();

            }
            reader1.close();
            // fs.close();

        } catch (IOException ioe) {
            ioe.printStackTrace();
        }

        return nbNOver;
    }

    

    public static Integer[] writeSgFile(String in_g, String out_sg1, String out_sg2, ArrayList<Integer> ng1,
            ArrayList<Integer> ng2, String inf) {

        Integer nbNOver1 = 0;
        Integer nbNOver2 = 0;
        Integer nbEdge1 = 0;
        Integer nbEdge2 = 0;
        ArrayList<String> lstEdge = new ArrayList<String>();

        try {

            // Path i = new Path(in_g);
            // FileSystem fs = FileSystem.get(new Configuration());
            // BufferedReader reader = new BufferedReader(new
            // InputStreamReader(fs.open(i)));
            //

            Path p = new Path(in_g);
            FileSystem fs = FileSystem.get(new Configuration());
            BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(p)));

            String line = "";
            String text1 = "";
            String text2 = "";
            String l;
            String dis1 = "0";
            String dis2 = "0";

            while ((line = reader.readLine()) != null) {

                String[] sp = line.split(" ");

                Integer n = Integer.parseInt(sp[0]);
                l = "";

                if (ng1.contains(n)) {

                    l = n + " " + dis1 + " ";

                    for (String a : sp[2].split(":")) {

                        if (ng1.contains(Integer.parseInt(a))) {
                            l = l + a + ":";

                            if (!lstEdge.contains(n + " " + a) && !lstEdge.contains(a + " " + n)) {
                                nbEdge1++;
                                lstEdge.add(n + " " + a);
                            }

                        } else {
                            nbNOver1++;
                        }
                    }

                    text1 += l + "\r\n";
                    dis1 = inf;
                }

                if (ng2.contains(n)) {

                    l = n + " " + dis2 + " ";

                    for (String a : sp[2].split(":")) {

                        if (ng2.contains(Integer.parseInt(a))) {
                            l = l + a + ":";
                            if (!lstEdge.contains(n + " " + a) && !lstEdge.contains(a + " " + n)) {
                                nbEdge2++;
                                lstEdge.add(n + " " + a);
                            }
                        } else {
                            nbNOver2++;
                        }
                    }

                    text2 += l + "\r\n";
                    dis2 = inf;
                }

            }
            reader.close();

            // FSDataOutputStream out1 = fs.create( new Path(out_sg1));
            // out1.write(text1.getBytes());

            FSDataOutputStream writer = fs.create(new Path(out_sg1));
            writer.write(text1.getBytes());
            writer.close();

            // FSDataOutputStream out2 = fs.create( new Path(out_sg2));
            // out2.write(text2.getBytes());

            writer = fs.create(new Path(out_sg2));
            writer.write(text2.getBytes());
            writer.close();

            reader.close();
            // fs.close();

        } catch (IOException ioe) {
            ioe.printStackTrace();
        }

        Integer[] t = new Integer[4];
        t[0] = nbNOver1;
        t[1] = nbNOver2;

        t[2] = nbEdge1;
        t[3] = nbEdge2;

        return t;
    }

    public static void main(String[] args) throws Exception {
        
        
        System.out.println("Exe Name :");
        Scanner s = new Scanner(System.in);
        exeName=s.next();
       
        outputName= exeName+"/out";
        
        args[1]= outputName +"/"+args[1] ;
     
        System.exit(ToolRunner.run(new DGC_m4_random(), args));
    }

}

New Contributor
Posts: 3
Registered: ‎02-15-2019

Re: Hadoop MapReduce wrong result

Ihis could not solve my problem. Can some one help me
Announcements