Archives of Support Questions (Read Only)

This is an archived board for historical reference. Information and links may no longer be available or relevant
Announcements
This board is archived and read-only for historical reference. To ask a new question, please post a new topic on the appropriate active board.

Index/Rank a Grouped Rdd in Spark Scala

avatar
Expert Contributor

I have been really looking to index/rank a grouped rdd. The RDD was grouped by a key as you can see below and I want to index it starting from number 2 for each of the arrays under a key.

What I have

scala.collection.immutable.Map[String,Array[(String, String, String)]] = 
Map( 
        394 -> Array((394,394,0), (394,362,7), (394,368,7)), 
        328 -> Array((328,328,0), (328,324,7), (328,325,7), (328,326,7), (328,327,7), (328,329,7),
        368 -> Array((368,368,0), (368,394,7), (368,396,7), (368,397,7), (368,479896,7)),
        278 -> Array((278,278,0), (278,371,7), (278,372,7))
)

What I want (Notice the the new 4th element of each Array, its an index starting from 2)

        394 -> Array((394,394,0,2), (394,362,7,3), (394,368,7,4)), 
        328 -> Array((328,328,0,2), (328,324,7,3), (328,325,7,4), (328,326,7,5), (328,327,7), (328,329,7,6),
        368 -> Array((368,368,0,2), (368,394,7,3), (368,396,7,4), (368,397,7,5), (368,479896,7,6),
        278 -> Array((278,278,0,2), (278,371,7,3), (278,372,7,4))
  
1 ACCEPTED SOLUTION

avatar
Super Collaborator

Use the mapValues api. I made an example of doing what you wanted below. You'll have to update the listbuffer to use the types you have, but it looks like its doing what you want.

val rdd1 = sc.parallelize(Array((1,2),(2,3),(1,3),(2,4)))
val gRdd = rdd1.groupByKey()
val indxRdd = gRdd.mapValues(a => {
  val b = a.toArray
  var indx = 2
  val lb = new ListBuffer[(Int, Int)]
  for(i <- 0 to b.size-1) {
    lb.append((b(i), indx))
    indx += 1
  }
  lb.toArray
}).collectAsMap()
indxRdd.collectAsMap()
res8: scala.collection.Map[Int,Array[(Int, Int)]] = 
Map(
	2 -> Array((3,2), (4,3)), 
	1 -> Array((2,2), (3,3))
)

View solution in original post

3 REPLIES 3

avatar
Expert Contributor

avatar
Super Collaborator

Use the mapValues api. I made an example of doing what you wanted below. You'll have to update the listbuffer to use the types you have, but it looks like its doing what you want.

val rdd1 = sc.parallelize(Array((1,2),(2,3),(1,3),(2,4)))
val gRdd = rdd1.groupByKey()
val indxRdd = gRdd.mapValues(a => {
  val b = a.toArray
  var indx = 2
  val lb = new ListBuffer[(Int, Int)]
  for(i <- 0 to b.size-1) {
    lb.append((b(i), indx))
    indx += 1
  }
  lb.toArray
}).collectAsMap()
indxRdd.collectAsMap()
res8: scala.collection.Map[Int,Array[(Int, Int)]] = 
Map(
	2 -> Array((3,2), (4,3)), 
	1 -> Array((2,2), (3,3))
)

avatar
Expert Contributor

Thank you so much!