Created on 11-27-2015 07:50 AM - edited 09-16-2022 02:50 AM
Hello,
I have a simple Spark RDD[(Int, String)].
The Int is a position, the String is a value. My goal is to produce a simple String from this RDD by concatenating all values. The difficulty is that I need to keep the value position defined by the Int 🙂
Here is a my set of values:
val inputDataRdd = sc.parallelize(Seq((166,"AAA"),(173,"A"),(180,"A"),(184,"AAA"),(185,999),(186,"AAA"),(1,"AAA"),(2,"AAA"),(33,999),(83,"27-11-2015"),(146,"AAA"),(155,"AA"),(160,"AA")))
Here is my expected String:
"AAA","AAA",999"27-11-2015","AAA","AAA","AA","AAA","A","A","AAA",999,"AAA"
What respects the order defined by the Int key:
1,2,33,83,146,155,160,166,173,180,184,185,186
I'm able to define a new Spark RDD using these inputs.
I can convert it to a pair-RDD with the int a the key (position) and the String as value.
Then I can easily sort it.
When I try to extract RDD values and to concatenate it as a string (reduce), I loose my order...
This behavior seems pretty normal as the data manipulation operations (map/reduce/foreach) are distributed throught the Spark cluster.
Is there a way to force a reduce operation not to be distributed ?
Here is another try, using Spark SQL, but I get same issue:
package scala
import java.text.SimpleDateFormat
import java.util.Calendar
import com.typesafe.config.Config
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileUtil, FileSystem}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.types.{ StringType, IntegerType, StructField, StructType }
import scala.collection.mutable.ListBuffer
object generator {
  def main(args: Array[String]) {
    val SPARK_MASTER = "local[*]" // "local[*]" - "spark://...:7077"
    val HDFS_NAMENODE = "hdfs://...:8020"
    val HDFS_WORK_PATH = "/user/..."
    // Initialize the environment - Hadoop
    val hadoopConf = new Configuration()
    hadoopConf.set("fs.default.name", HDFS_NAMENODE)
    val hdfs = FileSystem.get(hadoopConf)
    // Initialize the environment - Spark
    val sConf = new SparkConf().setAppName("test")
    sConf.setMaster(SPARK_MASTER) 
    val sc = new SparkContext(sConf)
    val sqlContext = new SQLContext(sc)
    val inputDataRdd = sc.parallelize(Seq((166,"AAA"),(173,"A"),(180,"A"),(184,"AAA"),(185,999),(186,"AAA"),(1,"AAA"),(2,"AAA"),(33,999),(83,"27-11-2015"),(146,"AAA"),(155,"AA"),(160,"AA")))
      .map(tuple => {
      val row = Seq(tuple._1, String.valueOf(tuple._2)) // force String type for data
      Row.fromSeq(row)
    })
    // inputDataRdd is RDD[Row]
    val schema = {
      val srcFields = List("f_int", "f_any").zipWithIndex.map(field => StructField(
        field._1,
        field._2 match {
          case 0 => IntegerType
          case 1 => StringType
        },
        nullable = false))
      StructType(srcFields.toArray)
    }
    val dataframe = sqlContext.createDataFrame(inputDataRdd, schema)
    dataframe.registerTempTable("input_dataframe")
    val sorted = sqlContext.sql("select f_any from input_dataframe order by f_int")
    sorted.show()
/*
+----------+
|     f_any|
+----------+
|       AAA
|       AAA
|       999
|27-11-2015
|       AAA
|        AA
|        AA
|       AAA
|         A
|         A
|       AAA
|       99
|       AAA
+----------+
*/
    val res = sorted.rdd.reduce((row1, row2) => Row.fromSeq(Seq(row1(0) + "," + row2(0)))) // res is a Row
    val resRdd = sc.parallelize(res.toSeq) // resRdd is RDD[Any]
    resRdd.saveAsTextFile(HDFS_NAMENODE + HDFS_WORK_PATH + "/test_output.csv")The data frame content is well sorted, until I reduce it to a single row...
Here is the HDFS output content, it may vary on each run:
999,AAA,AAA,AAA,A,AA,AA,AAA,AAA,A,999,27-11-2015,AAA
Another case that troubles me is the following. Let's imagine I try to update the String of each RDD row, I would like to set a code defined by a sequence. For example, "myrow_1", "myrow_2", "myrow_3", ...
To do that I will use an integer that will be incremented each time it will be used.
Such data manipulation would succeed, using RDD.map(), but I may have same number used many times as each worker will increase its own integer value... Isn't it possible, again, to say Spark engine that - for this map operation only - I need not to distribute the job?
Thanks for your feedback 🙂
Created on 12-03-2015 08:52 AM - edited 12-03-2015 08:54 AM
Use `.coalesce(1, shuffle = true)` to repartition before sorting and reducing. Failing that, use an `aggregate` function that inserts the sorted data at the appropriate point in the list.
Created on 12-03-2015 08:52 AM - edited 12-03-2015 08:54 AM
Use `.coalesce(1, shuffle = true)` to repartition before sorting and reducing. Failing that, use an `aggregate` function that inserts the sorted data at the appropriate point in the list.
Created 12-03-2015 08:53 AM
Created on 12-14-2015 09:04 AM - edited 12-14-2015 09:08 AM
Hi Srita and thanks for your answers 🙂
For the first one, I cannot use the coalesce tip before sorting as I just need to concatenate strings with a defined order. I will look at the aggregate function advise.
Here is a simple Spark sample of code to reproduce the issue easily, without Spark-Sql stuff:
    // Input data: (166,"A"),(2,"B"),(200,"C"),(100,"D")
    // Expected output is : "B","D","A","C" (String values concatenated using order defined by Int values)
    val data = sc.parallelize(Seq((166,"A"),(2,"B"),(200,"C"),(100,"D")))
    // RDD[(Int, String)]
    val keyedSortedData = data.keyBy(x => x._1).sortByKey()
    // RDD[(Int, (Int, String))]
    val sortedData = keyedSortedData.map({case (a,(b,c)) => c})
    // RDD[String]
    val reducedData = sortedData.reduce((valueA, valueB) => valueA + "," + valueB).toString
    // String in CSV format
    println(reducedData)
    // Expected: B,D,A,C
    // Execution 1 - Result is: A,D,C,B
    // Execution 2 - Result is: A,C,D,B
    // Execution 3 - Result is: C,B,A,D
    // ...
For the second one, I have been advised to use Spark accumulators. The zipWithIndex method seems to be a good alternative.
Created on 12-16-2015 04:55 AM - edited 12-16-2015 04:56 AM
Here is the answer, thanks again Srita.
/**
* Return a new RDD that has exactly numPartitions partitions.
*
* Can increase or decrease the level of parallelism in this RDD. Internally, this uses
* a shuffle to redistribute data.
*
* If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
* which can avoid performing a shuffle.
*/
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null😞 RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}
  /**
   * Return a new RDD that is reduced into `numPartitions` partitions.
   *
   * This results in a narrow dependency, e.g. if you go from 1000 partitions
   * to 100 partitions, there will not be a shuffle, instead each of the 100
   * new partitions will claim 10 of the current partitions.
   *
   * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
   * this may result in your computation taking place on fewer nodes than
   * you like (e.g. one node in the case of numPartitions = 1). To avoid this,
   * you can pass shuffle = true. This will add a shuffle step, but means the
   * current upstream partitions will be executed in parallel (per whatever
   * the current partitioning is).
   *
   * Note: With shuffle = true, you can actually coalesce to a larger number
   * of partitions. This is useful if you have a small number of partitions,
   * say 100, potentially with a few partitions being abnormally large. Calling
   * coalesce(1000, shuffle = true) will result in 1000 partitions with the
   * data distributed using a hash partitioner.
   */
  def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null)
      : RDD[T] = withScope {
...
}Application source code:
    val data = sc.parallelize(Seq((166,"A"),(2,"B"),(200,"C"),(100,"D")))
    // RDD[(Int, String)]
    val keyedSortedData = data.keyBy(x => x._1).sortByKey()
    // RDD[(Int, (Int, String))]
    val sortedData = keyedSortedData.map({case (a,(b,c)) => c})
    // RDD[String]
    val reducedData = sortedData.coalesce(1, shuffle = true).reduce((valueA, valueB) => valueA + "," + valueB).toString
    // String in CSV format
    // also works using coalesce(1, shuffle = false) or partition(1)
    println(reducedData)