Support Questions

Find answers, ask questions, and share your expertise

How to create spark PairRDD in scala .?

avatar
Rising Star

I am trying to verify cogroup join and groupByKey for PairRDDs. I could check that in Spark Java API. But, cannot do it with scala project

 

Below is the simple code that i tried, let me know where i made mistake.

 

object PairsCheck {

def main(args: Array[String]) = {

val conf = new SparkConf;
val sc = new SparkContext(conf)

val lines = sc.textFile("/home/test1.txt")

val lines2 = sc.textFile("/home/test2.txt")

 

val words = lines.flatMap { x => x.split("\\W+") }
val words2 = lines2.flatMap { x => x.split("\\W+") }

val pairs: RDD[(Int, String)] = words.map {case(x) => (x.length(), x) }
val pairs2: RDD[(Int, String)] = words2.map {case(x) => (x.length(), x) }

import org.apache.spark.SparkContext._
// --> Here i tried to call join/co group functions that applies for pairsRDD, but could not do that. If i call join, it is throwing error.

 

Thank you in advance.

1 ACCEPTED SOLUTION

avatar
Contributor

Hi,

 

To create a pair-RDD from a RDD, I used the "keyBy" transformation to extract the key from each value:

val fileC = sc.textFile("hdfs://.../user/.../myfile.txt")
                          .keyBy(line => line.substring(5,13).trim())
                          .mapValues(line => (    line.substring(87,92).trim()
                                              ,   line.substring(99,112).trim()
                                              ,   line.substring(120,126).trim()
                                              ,   line.substring(127,131).trim()
                                              )
                                    )

The "keyBy" provides me a new pair-RDD for which the key is a substring of my text value.

Then the "mapValues" transformations opers like a "map" one on each value of my pair-RDD, not on keys...

View solution in original post

1 REPLY 1

avatar
Contributor

Hi,

 

To create a pair-RDD from a RDD, I used the "keyBy" transformation to extract the key from each value:

val fileC = sc.textFile("hdfs://.../user/.../myfile.txt")
                          .keyBy(line => line.substring(5,13).trim())
                          .mapValues(line => (    line.substring(87,92).trim()
                                              ,   line.substring(99,112).trim()
                                              ,   line.substring(120,126).trim()
                                              ,   line.substring(127,131).trim()
                                              )
                                    )

The "keyBy" provides me a new pair-RDD for which the key is a substring of my text value.

Then the "mapValues" transformations opers like a "map" one on each value of my pair-RDD, not on keys...