Member since
08-14-2015
24
Posts
2
Kudos Received
4
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
17134 | 09-01-2015 12:19 AM | |
5502 | 09-01-2015 12:04 AM | |
87831 | 08-14-2015 07:45 AM | |
14843 | 08-14-2015 07:42 AM |
12-16-2015
04:55 AM
Here is the answer, thanks again Srita. /** * Return a new RDD that has exactly numPartitions partitions. * * Can increase or decrease the level of parallelism in this RDD. Internally, this uses * a shuffle to redistribute data. * * If you are decreasing the number of partitions in this RDD, consider using `coalesce`, * which can avoid performing a shuffle. */ def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { coalesce(numPartitions, shuffle = true) } /**
* Return a new RDD that is reduced into `numPartitions` partitions.
*
* This results in a narrow dependency, e.g. if you go from 1000 partitions
* to 100 partitions, there will not be a shuffle, instead each of the 100
* new partitions will claim 10 of the current partitions.
*
* However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
* this may result in your computation taking place on fewer nodes than
* you like (e.g. one node in the case of numPartitions = 1). To avoid this,
* you can pass shuffle = true. This will add a shuffle step, but means the
* current upstream partitions will be executed in parallel (per whatever
* the current partitioning is).
*
* Note: With shuffle = true, you can actually coalesce to a larger number
* of partitions. This is useful if you have a small number of partitions,
* say 100, potentially with a few partitions being abnormally large. Calling
* coalesce(1000, shuffle = true) will result in 1000 partitions with the
* data distributed using a hash partitioner.
*/
def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null)
: RDD[T] = withScope {
...
} Application source code: val data = sc.parallelize(Seq((166,"A"),(2,"B"),(200,"C"),(100,"D")))
// RDD[(Int, String)]
val keyedSortedData = data.keyBy(x => x._1).sortByKey()
// RDD[(Int, (Int, String))]
val sortedData = keyedSortedData.map({case (a,(b,c)) => c})
// RDD[String]
val reducedData = sortedData.coalesce(1, shuffle = true).reduce((valueA, valueB) => valueA + "," + valueB).toString
// String in CSV format
// also works using coalesce(1, shuffle = false) or partition(1)
println(reducedData)
... View more
09-19-2015
06:40 AM
Replications is an HDFS-level configuration. It isn't something you configure from Spark, and you don't have to worry about it from Spark. AFAIK you set a global replication factor, but can set it per directory too. I think you want to pursue this via HDFS.
... View more
09-01-2015
12:19 AM
I made a choice: Spark-JobServer. This project is almost done exactly in response to my needs, it allows to share RRD between applications as it shares a context. It supports Spark Sql/Hive contexts. And it is fully working without the need to install a new component on all cluster nodes 🙂
... View more
09-01-2015
12:04 AM
I would summarize saying that one may use SparkSql (or Hive) in order to write SQL queries with complex joining. Else, with Spark, one is able and must describe the execution plan, so he has to write each join separately.
... View more
08-20-2015
08:16 AM
Hello, I still need to dig this but I will also check the MapFiles that are some Indexed SequenceFiles. I'll provide me feedback then 🙂 Greg.
... View more
08-14-2015
07:42 AM
Hi, To create a pair-RDD from a RDD, I used the "keyBy" transformation to extract the key from each value: val fileC = sc.textFile("hdfs://.../user/.../myfile.txt")
.keyBy(line => line.substring(5,13).trim())
.mapValues(line => ( line.substring(87,92).trim()
, line.substring(99,112).trim()
, line.substring(120,126).trim()
, line.substring(127,131).trim()
)
) The "keyBy" provides me a new pair-RDD for which the key is a substring of my text value. Then the "mapValues" transformations opers like a "map" one on each value of my pair-RDD, not on keys...
... View more