Support Questions
Find answers, ask questions, and share your expertise

K-mer scala program improvement in Apache Spark

Highlighted

K-mer scala program improvement in Apache Spark

Explorer

Hello Members, I have executed the following program (K-mer counter) in 2 nodes spark cluster on the data set size 10.2 GB. The amount of time it has taken to run is around 35 minutes. In another experiment, I have executed a Java program for the same K-mer counter problem in Hadoop-2 two nodes cluster. The amount of time it has taken is around 35 minutes. We know that Spark's cluster performance should be greater than Hadoop cluster. But in my case, the running time for both the cluster is same. I would like to know that In Spark cluster, am i utilizing all the available resources in 2 nodes cluster efficiently or not? Also, I would like to know that my Scala program for k-mer counter can be improvised than what i have now?

My hardware is as follows:

CPU: Intel i7 processor with 8 cores in both the machines in cluster, RAM: 8GB in both the machines in cluster, OS: Ubuntu 16.04, Harddisk capacity: Master node 1 TB and Slave node 500 GB, Nodes are connected through switch in the network, Ip address of master: 172.30.16.233, Ip address of slave: 172.30.17.15.

My execution command is as follows

spark-submit --class Kmer1 --master spark://saravanan:7077 --executor-memory 5g /home/hduser/sparkapp/target/scala-2.11/sparkapp_2.11-0.1.jar hdfs://172.30.16.233:54310//input hdfs://172.30.16.233:54310//output

Number of partition spark system has created is 78 and no of executors is 2.

import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    object Kmer1 {
      //
      def main(args: Array[String]): Unit = {
        //
        val sparkConf = new SparkConf().setAppName("Kmer1")
        val sc = new SparkContext(sparkConf)
        val input = args(0)
        val K = 25
        val broadcastK = sc.broadcast(K)
        val records = sc.textFile(input)
        val only_three = records.zipWithIndex.filter{case (_,i)=>(i+1)%4 !=0}.map{case (e,_) =>e}
        // remove the records, which are not an actual sequence data
        val filteredRDD = only_three.filter(line => {
          !( 
             line.startsWith("@") || 
             line.startsWith("+") || 
             line.startsWith(";") ||
             line.startsWith("!") || 
             line.startsWith("~") ||
             line.startsWith(">") 
           )
        })
        val kmers = filteredRDD.flatMap(_.sliding(broadcastK.value, 1).map((_, 1)))
        // find frequencies of kmers
        val kmersGrouped = kmers.reduceByKey(_ + _)
         kmersGrouped.saveAsTextFile(args(1))
        // done!
        sc.stop()
      }
    }