Created 01-14-2017 11:12 PM
I have been really looking to index/rank a grouped rdd. The RDD was grouped by a key as you can see below and I want to index it starting from number 2 for each of the arrays under a key.
What I have
scala.collection.immutable.Map[String,Array[(String, String, String)]] = Map( 394 -> Array((394,394,0), (394,362,7), (394,368,7)), 328 -> Array((328,328,0), (328,324,7), (328,325,7), (328,326,7), (328,327,7), (328,329,7), 368 -> Array((368,368,0), (368,394,7), (368,396,7), (368,397,7), (368,479896,7)), 278 -> Array((278,278,0), (278,371,7), (278,372,7)) )
What I want (Notice the the new 4th element of each Array, its an index starting from 2)
394 -> Array((394,394,0,2), (394,362,7,3), (394,368,7,4)), 328 -> Array((328,328,0,2), (328,324,7,3), (328,325,7,4), (328,326,7,5), (328,327,7), (328,329,7,6), 368 -> Array((368,368,0,2), (368,394,7,3), (368,396,7,4), (368,397,7,5), (368,479896,7,6), 278 -> Array((278,278,0,2), (278,371,7,3), (278,372,7,4))
Created 01-23-2017 03:11 PM
Use the mapValues api. I made an example of doing what you wanted below. You'll have to update the listbuffer to use the types you have, but it looks like its doing what you want.
val rdd1 = sc.parallelize(Array((1,2),(2,3),(1,3),(2,4))) val gRdd = rdd1.groupByKey() val indxRdd = gRdd.mapValues(a => { val b = a.toArray var indx = 2 val lb = new ListBuffer[(Int, Int)] for(i <- 0 to b.size-1) { lb.append((b(i), indx)) indx += 1 } lb.toArray }).collectAsMap()
indxRdd.collectAsMap() res8: scala.collection.Map[Int,Array[(Int, Int)]] = Map( 2 -> Array((3,2), (4,3)), 1 -> Array((2,2), (3,3)) )
Created 01-16-2017 12:45 AM
Created 01-23-2017 03:11 PM
Use the mapValues api. I made an example of doing what you wanted below. You'll have to update the listbuffer to use the types you have, but it looks like its doing what you want.
val rdd1 = sc.parallelize(Array((1,2),(2,3),(1,3),(2,4))) val gRdd = rdd1.groupByKey() val indxRdd = gRdd.mapValues(a => { val b = a.toArray var indx = 2 val lb = new ListBuffer[(Int, Int)] for(i <- 0 to b.size-1) { lb.append((b(i), indx)) indx += 1 } lb.toArray }).collectAsMap()
indxRdd.collectAsMap() res8: scala.collection.Map[Int,Array[(Int, Int)]] = Map( 2 -> Array((3,2), (4,3)), 1 -> Array((2,2), (3,3)) )
Created 01-23-2017 03:35 PM
Thank you so much!