Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Index/Rank a Grouped Rdd in Spark Scala

avatar
Expert Contributor

I have been really looking to index/rank a grouped rdd. The RDD was grouped by a key as you can see below and I want to index it starting from number 2 for each of the arrays under a key.

What I have

scala.collection.immutable.Map[String,Array[(String, String, String)]] = 
Map( 
        394 -> Array((394,394,0), (394,362,7), (394,368,7)), 
        328 -> Array((328,328,0), (328,324,7), (328,325,7), (328,326,7), (328,327,7), (328,329,7),
        368 -> Array((368,368,0), (368,394,7), (368,396,7), (368,397,7), (368,479896,7)),
        278 -> Array((278,278,0), (278,371,7), (278,372,7))
)

What I want (Notice the the new 4th element of each Array, its an index starting from 2)

        394 -> Array((394,394,0,2), (394,362,7,3), (394,368,7,4)), 
        328 -> Array((328,328,0,2), (328,324,7,3), (328,325,7,4), (328,326,7,5), (328,327,7), (328,329,7,6),
        368 -> Array((368,368,0,2), (368,394,7,3), (368,396,7,4), (368,397,7,5), (368,479896,7,6),
        278 -> Array((278,278,0,2), (278,371,7,3), (278,372,7,4))
  
1 ACCEPTED SOLUTION

avatar
Super Collaborator

Use the mapValues api. I made an example of doing what you wanted below. You'll have to update the listbuffer to use the types you have, but it looks like its doing what you want.

val rdd1 = sc.parallelize(Array((1,2),(2,3),(1,3),(2,4)))
val gRdd = rdd1.groupByKey()
val indxRdd = gRdd.mapValues(a => {
  val b = a.toArray
  var indx = 2
  val lb = new ListBuffer[(Int, Int)]
  for(i <- 0 to b.size-1) {
    lb.append((b(i), indx))
    indx += 1
  }
  lb.toArray
}).collectAsMap()
indxRdd.collectAsMap()
res8: scala.collection.Map[Int,Array[(Int, Int)]] = 
Map(
	2 -> Array((3,2), (4,3)), 
	1 -> Array((2,2), (3,3))
)

View solution in original post

3 REPLIES 3

avatar
Expert Contributor

avatar
Super Collaborator

Use the mapValues api. I made an example of doing what you wanted below. You'll have to update the listbuffer to use the types you have, but it looks like its doing what you want.

val rdd1 = sc.parallelize(Array((1,2),(2,3),(1,3),(2,4)))
val gRdd = rdd1.groupByKey()
val indxRdd = gRdd.mapValues(a => {
  val b = a.toArray
  var indx = 2
  val lb = new ListBuffer[(Int, Int)]
  for(i <- 0 to b.size-1) {
    lb.append((b(i), indx))
    indx += 1
  }
  lb.toArray
}).collectAsMap()
indxRdd.collectAsMap()
res8: scala.collection.Map[Int,Array[(Int, Int)]] = 
Map(
	2 -> Array((3,2), (4,3)), 
	1 -> Array((2,2), (3,3))
)

avatar
Expert Contributor

Thank you so much!