Member since
12-03-2015
3
Posts
0
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
4803 | 12-03-2015 08:52 AM |
12-16-2015
04:55 AM
Here is the answer, thanks again Srita. /** * Return a new RDD that has exactly numPartitions partitions. * * Can increase or decrease the level of parallelism in this RDD. Internally, this uses * a shuffle to redistribute data. * * If you are decreasing the number of partitions in this RDD, consider using `coalesce`, * which can avoid performing a shuffle. */ def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { coalesce(numPartitions, shuffle = true) } /**
* Return a new RDD that is reduced into `numPartitions` partitions.
*
* This results in a narrow dependency, e.g. if you go from 1000 partitions
* to 100 partitions, there will not be a shuffle, instead each of the 100
* new partitions will claim 10 of the current partitions.
*
* However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
* this may result in your computation taking place on fewer nodes than
* you like (e.g. one node in the case of numPartitions = 1). To avoid this,
* you can pass shuffle = true. This will add a shuffle step, but means the
* current upstream partitions will be executed in parallel (per whatever
* the current partitioning is).
*
* Note: With shuffle = true, you can actually coalesce to a larger number
* of partitions. This is useful if you have a small number of partitions,
* say 100, potentially with a few partitions being abnormally large. Calling
* coalesce(1000, shuffle = true) will result in 1000 partitions with the
* data distributed using a hash partitioner.
*/
def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null)
: RDD[T] = withScope {
...
} Application source code: val data = sc.parallelize(Seq((166,"A"),(2,"B"),(200,"C"),(100,"D")))
// RDD[(Int, String)]
val keyedSortedData = data.keyBy(x => x._1).sortByKey()
// RDD[(Int, (Int, String))]
val sortedData = keyedSortedData.map({case (a,(b,c)) => c})
// RDD[String]
val reducedData = sortedData.coalesce(1, shuffle = true).reduce((valueA, valueB) => valueA + "," + valueB).toString
// String in CSV format
// also works using coalesce(1, shuffle = false) or partition(1)
println(reducedData)
... View more