Reply
New Contributor
Posts: 1
Registered: ‎01-03-2018

Hadoop performance Issues

[ Edited ]

Hello Friends, I am doing a hadoop project where I am working with 100MB, 500MB, 1GB files. A multinode hadoop cluster with 4 nodes is implemented for the purpose. The time taken for running the mapreduce program in multinode cluster is much larger than the time taken in running single node cluster setup. Also, it is shocking to observe that the basic Java program(without Hadoop) finishes the operation faster than both the single and multi node clusters. Here is the code for the mapper class:

 

public class myMapperClass extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable>
{

 private final static IntWritable one = new IntWritable(1);
 private final static IntWritable two = new IntWritable(2);
 private final static IntWritable three = new IntWritable(3);
 private final static IntWritable four = new IntWritable(4);
 private final static IntWritable five = new IntWritable(5);
 private final static IntWritable six = new IntWritable(6);
 private final static IntWritable seven = new IntWritable(7);
 private final static IntWritable eight = new IntWritable(8);
 private final static IntWritable nine= new IntWritable(9);

  private Text srcIP,srcIPN;
  private Text dstIP,dstIPN;
  private Text srcPort,srcPortN;
  private Text dstPort,dstPortN;
  private Text counter1,counter2,counter3,counter4,counter5 ;
  //private Text total_records;

  int ddos_line = 0;
  //map method that performs the tokenizer job and framing the initial key value pairs
  @Override
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
  {
        String line1 = value.toString();
        ddos_line++;
        int pos1=0;
        int lineno=0;

        int[] count = {100000, 100000, 100000, 100000, 100000};
        int[] lineIndex = {0, 0, 0, 0, 0};

        for(int i=0;i<9;i++)
        {
            pos1 = line1.indexOf("|",pos1+1);
        }

        srcIP =  new Text( line1.substring(0,line1.indexOf("|")) );
        String srcIPP = srcIP.toString();
        dstIP = new Text(line1.substring( srcIPP.length()+1,line1.indexOf("|",srcIPP.length()+1)) ) ;

        srcPort = new Text( line1.substring(pos1+1,line1.indexOf("|",pos1+1)) );
        pos1 = line1.indexOf("|",pos1+1);
        dstPort = new Text( line1.substring(pos1+1,line1.indexOf("|",pos1+1)) );

        //BufferedReader br = new BufferedReader(new FileReader("/home/yogi/Desktop/normal_small"));            

        FileSystem fs = FileSystem.get(new Configuration());            
        FileStatus[] status = fs.listStatus(new Path("hdfs://master:54310/usr/local/hadoop/input/normal_small"));            
        BufferedReader br=new BufferedReader(new InputStreamReader(fs.open(status[0].getPath())));           

        String line=br.readLine();

        lineno++;
        boolean bool = true;
        while (bool) {

            for(int i=0; i<5;i++)
            {
                if(bool==false)
                    break;
                int pos=0;
                int temp;
                for(int j=0;j<9;j++)
                {
                    pos = line.indexOf("|",pos+1);
                }


                srcIPN =  new Text( line.substring(0,line.indexOf("|")) );
                String srcIPP2 = srcIPN.toString();
                dstIPN = new Text(line.substring( srcIPP2.length()+1,line.indexOf("|",srcIPP2.length()+1)) ) ;

                srcPortN = new Text( line.substring(pos+1,line.indexOf("|",pos+1)) );
                pos = line.indexOf("|",pos+1);
                dstPortN = new Text( line.substring(pos+1,line.indexOf("|",pos+1)) );


                if(srcIP.equals(srcIPN) && dstIP.equals(dstIPN))
                {
                    int tmp, tmp2;

                    tmp = Integer.parseInt(srcPort.toString()) - Integer.parseInt(srcPortN.toString());
                    if(tmp<0)
                        tmp*=-1;

                    tmp2 = Integer.parseInt(dstPort.toString()) - Integer.parseInt(dstPortN.toString());
                    if(tmp2<0)                      
                        tmp2*=-1;

                    temp=tmp+tmp2;


                    if(count[4] > temp)
                    {
                        count[4] = temp;
                        lineIndex[4]=lineno;
                    } 


                    for(int k=0;k<5;k++)
                    {
                        for(int j=0;j<4;j++)
                        {   
                            if(count[j] > count[j+1]) 
                            {
                                int temp2 = count[j+1];
                                count[j+1] = count[j];
                                count[j] = temp2;

                                int temp3 = lineIndex[j+1];
                                lineIndex[j+1] = lineIndex[j];
                                lineIndex[j] = temp3;
                            }
                        }
                    }



                }

                if ((line = br.readLine()) != null )
                {
                    lineno++;
                    continue;
                } 

                else 
                    bool = false;
            }


        }
        br.close();         



        counter1 = new Text(count[0]+" "+lineIndex[0]+":"+ddos_line);
        counter2 = new Text(count[1]+" "+lineIndex[1]+":"+ddos_line);
        counter3 = new Text(count[2]+" "+lineIndex[2]+":"+ddos_line);
        counter4 = new Text(count[3]+" "+lineIndex[3]+":"+ddos_line);
        counter5 = new Text(count[4]+" "+lineIndex[4]+":"+ddos_line);




      output.collect(srcIP, one);
      output.collect(dstIP, two);
      output.collect(srcPort, three);
      output.collect(dstPort, four);
      output.collect(counter1, five);
      output.collect(counter2, six);
      output.collect(counter3, seven);
      output.collect(counter4, eight);
      output.collect(counter5, nine);
     // output.collect(total_records, total);

      //iterating through all the words available in that line and forming the key value pair

   }

Kindly mention as to what is the reason behind this anomaly.

 

P.S There are no issues with the configuration of the multinode hadoop cluster.

 

Edit: I would like to explain what I am doing in this mapper class. I am having a 100KB file of normal flow record data which contains 1000 flows. There is another flow record of 100MB which contains 1,000,000 flows in it. For every flow from the 100MB file, I am checking(comparing the src IP's, port no.s and the Dst IP's and port no.s) with all of the flows from the 100KB file(to select the closest five flows from the 100KB file for every flow from the 100MB file). Thus, I am performing a total of 1000*1,000,000 similarity check operations. I would like to further stress that both of these files are stored in HDFS and not local system. Kindly suggest as to what is wrong with the code.

 

Thanks

Announcements