Created on 11-27-2015 07:50 AM - edited 09-16-2022 02:50 AM
Hello,
I have a simple Spark RDD[(Int, String)].
The Int is a position, the String is a value. My goal is to produce a simple String from this RDD by concatenating all values. The difficulty is that I need to keep the value position defined by the Int 🙂
Here is a my set of values:
val inputDataRdd = sc.parallelize(Seq((166,"AAA"),(173,"A"),(180,"A"),(184,"AAA"),(185,999),(186,"AAA"),(1,"AAA"),(2,"AAA"),(33,999),(83,"27-11-2015"),(146,"AAA"),(155,"AA"),(160,"AA")))
Here is my expected String:
"AAA","AAA",999"27-11-2015","AAA","AAA","AA","AAA","A","A","AAA",999,"AAA"
What respects the order defined by the Int key:
1,2,33,83,146,155,160,166,173,180,184,185,186
I'm able to define a new Spark RDD using these inputs.
I can convert it to a pair-RDD with the int a the key (position) and the String as value.
Then I can easily sort it.
When I try to extract RDD values and to concatenate it as a string (reduce), I loose my order...
This behavior seems pretty normal as the data manipulation operations (map/reduce/foreach) are distributed throught the Spark cluster.
Is there a way to force a reduce operation not to be distributed ?
Here is another try, using Spark SQL, but I get same issue:
package scala import java.text.SimpleDateFormat import java.util.Calendar import com.typesafe.config.Config import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{Path, FileUtil, FileSystem} import org.apache.spark.rdd.RDD import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.sql.types.{ StringType, IntegerType, StructField, StructType } import scala.collection.mutable.ListBuffer object generator { def main(args: Array[String]) { val SPARK_MASTER = "local[*]" // "local[*]" - "spark://...:7077" val HDFS_NAMENODE = "hdfs://...:8020" val HDFS_WORK_PATH = "/user/..." // Initialize the environment - Hadoop val hadoopConf = new Configuration() hadoopConf.set("fs.default.name", HDFS_NAMENODE) val hdfs = FileSystem.get(hadoopConf) // Initialize the environment - Spark val sConf = new SparkConf().setAppName("test") sConf.setMaster(SPARK_MASTER) val sc = new SparkContext(sConf) val sqlContext = new SQLContext(sc) val inputDataRdd = sc.parallelize(Seq((166,"AAA"),(173,"A"),(180,"A"),(184,"AAA"),(185,999),(186,"AAA"),(1,"AAA"),(2,"AAA"),(33,999),(83,"27-11-2015"),(146,"AAA"),(155,"AA"),(160,"AA"))) .map(tuple => { val row = Seq(tuple._1, String.valueOf(tuple._2)) // force String type for data Row.fromSeq(row) }) // inputDataRdd is RDD[Row] val schema = { val srcFields = List("f_int", "f_any").zipWithIndex.map(field => StructField( field._1, field._2 match { case 0 => IntegerType case 1 => StringType }, nullable = false)) StructType(srcFields.toArray) } val dataframe = sqlContext.createDataFrame(inputDataRdd, schema) dataframe.registerTempTable("input_dataframe") val sorted = sqlContext.sql("select f_any from input_dataframe order by f_int") sorted.show() /* +----------+ | f_any| +----------+ | AAA | AAA | 999 |27-11-2015 | AAA | AA | AA | AAA | A | A | AAA | 99 | AAA +----------+ */ val res = sorted.rdd.reduce((row1, row2) => Row.fromSeq(Seq(row1(0) + "," + row2(0)))) // res is a Row val resRdd = sc.parallelize(res.toSeq) // resRdd is RDD[Any] resRdd.saveAsTextFile(HDFS_NAMENODE + HDFS_WORK_PATH + "/test_output.csv")
The data frame content is well sorted, until I reduce it to a single row...
Here is the HDFS output content, it may vary on each run:
999,AAA,AAA,AAA,A,AA,AA,AAA,AAA,A,999,27-11-2015,AAA
Another case that troubles me is the following. Let's imagine I try to update the String of each RDD row, I would like to set a code defined by a sequence. For example, "myrow_1", "myrow_2", "myrow_3", ...
To do that I will use an integer that will be incremented each time it will be used.
Such data manipulation would succeed, using RDD.map(), but I may have same number used many times as each worker will increase its own integer value... Isn't it possible, again, to say Spark engine that - for this map operation only - I need not to distribute the job?
Thanks for your feedback 🙂
Created on 12-03-2015 08:52 AM - edited 12-03-2015 08:54 AM
Use `.coalesce(1, shuffle = true)` to repartition before sorting and reducing. Failing that, use an `aggregate` function that inserts the sorted data at the appropriate point in the list.
Created on 12-03-2015 08:52 AM - edited 12-03-2015 08:54 AM
Use `.coalesce(1, shuffle = true)` to repartition before sorting and reducing. Failing that, use an `aggregate` function that inserts the sorted data at the appropriate point in the list.
Created 12-03-2015 08:53 AM
Created on 12-14-2015 09:04 AM - edited 12-14-2015 09:08 AM
Hi Srita and thanks for your answers 🙂
For the first one, I cannot use the coalesce tip before sorting as I just need to concatenate strings with a defined order. I will look at the aggregate function advise.
Here is a simple Spark sample of code to reproduce the issue easily, without Spark-Sql stuff:
// Input data: (166,"A"),(2,"B"),(200,"C"),(100,"D") // Expected output is : "B","D","A","C" (String values concatenated using order defined by Int values) val data = sc.parallelize(Seq((166,"A"),(2,"B"),(200,"C"),(100,"D"))) // RDD[(Int, String)] val keyedSortedData = data.keyBy(x => x._1).sortByKey() // RDD[(Int, (Int, String))] val sortedData = keyedSortedData.map({case (a,(b,c)) => c}) // RDD[String] val reducedData = sortedData.reduce((valueA, valueB) => valueA + "," + valueB).toString // String in CSV format println(reducedData) // Expected: B,D,A,C // Execution 1 - Result is: A,D,C,B // Execution 2 - Result is: A,C,D,B // Execution 3 - Result is: C,B,A,D // ...
For the second one, I have been advised to use Spark accumulators. The zipWithIndex method seems to be a good alternative.
Created on 12-16-2015 04:55 AM - edited 12-16-2015 04:56 AM
Here is the answer, thanks again Srita.
/**
* Return a new RDD that has exactly numPartitions partitions.
*
* Can increase or decrease the level of parallelism in this RDD. Internally, this uses
* a shuffle to redistribute data.
*
* If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
* which can avoid performing a shuffle.
*/
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null😞 RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}
/** * Return a new RDD that is reduced into `numPartitions` partitions. * * This results in a narrow dependency, e.g. if you go from 1000 partitions * to 100 partitions, there will not be a shuffle, instead each of the 100 * new partitions will claim 10 of the current partitions. * * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1, * this may result in your computation taking place on fewer nodes than * you like (e.g. one node in the case of numPartitions = 1). To avoid this, * you can pass shuffle = true. This will add a shuffle step, but means the * current upstream partitions will be executed in parallel (per whatever * the current partitioning is). * * Note: With shuffle = true, you can actually coalesce to a larger number * of partitions. This is useful if you have a small number of partitions, * say 100, potentially with a few partitions being abnormally large. Calling * coalesce(1000, shuffle = true) will result in 1000 partitions with the * data distributed using a hash partitioner. */ def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null) : RDD[T] = withScope { ... }
Application source code:
val data = sc.parallelize(Seq((166,"A"),(2,"B"),(200,"C"),(100,"D"))) // RDD[(Int, String)] val keyedSortedData = data.keyBy(x => x._1).sortByKey() // RDD[(Int, (Int, String))] val sortedData = keyedSortedData.map({case (a,(b,c)) => c}) // RDD[String] val reducedData = sortedData.coalesce(1, shuffle = true).reduce((valueA, valueB) => valueA + "," + valueB).toString // String in CSV format // also works using coalesce(1, shuffle = false) or partition(1) println(reducedData)