Archives of Support Questions (Read Only)

This is an archived board for historical reference. Information and links may no longer be available or relevant
Announcements
This board is archived and read-only for historical reference. To ask a new question, please post a new topic on the appropriate active board.

Spark: force map/reduce no to be distributed

avatar
Contributor

Hello,

 

I have a simple Spark RDD[(Int, String)].

The Int is a position, the String is a value. My goal is to produce a simple String from this RDD by concatenating all values. The difficulty is that I need to keep the value position defined by the Int 🙂

 

Here is a my set of values:

val inputDataRdd = sc.parallelize(Seq((166,"AAA"),(173,"A"),(180,"A"),(184,"AAA"),(185,999),(186,"AAA"),(1,"AAA"),(2,"AAA"),(33,999),(83,"27-11-2015"),(146,"AAA"),(155,"AA"),(160,"AA")))

Here is my expected String:

"AAA","AAA",999"27-11-2015","AAA","AAA","AA","AAA","A","A","AAA",999,"AAA"

What respects the order defined by the Int key:

1,2,33,83,146,155,160,166,173,180,184,185,186

I'm able to define a new Spark RDD using these inputs.

I can convert it to a pair-RDD with the int a the key (position) and the String as value. 

Then I can easily sort it.

When I try to extract RDD values and to concatenate it as a string (reduce), I loose my order...

 

This behavior seems pretty normal as the data manipulation operations (map/reduce/foreach) are distributed throught the Spark cluster.

 

Is there a way to force a reduce operation not to be distributed ?

 

Here is another try, using Spark SQL, but I get same issue:

package scala

import java.text.SimpleDateFormat
import java.util.Calendar

import com.typesafe.config.Config
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileUtil, FileSystem}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.types.{ StringType, IntegerType, StructField, StructType }


import scala.collection.mutable.ListBuffer

object generator {
  def main(args: Array[String]) {

    val SPARK_MASTER = "local[*]" // "local[*]" - "spark://...:7077"
    val HDFS_NAMENODE = "hdfs://...:8020"
    val HDFS_WORK_PATH = "/user/..."

    // Initialize the environment - Hadoop
    val hadoopConf = new Configuration()
    hadoopConf.set("fs.default.name", HDFS_NAMENODE)
    val hdfs = FileSystem.get(hadoopConf)

    // Initialize the environment - Spark
    val sConf = new SparkConf().setAppName("test")
    sConf.setMaster(SPARK_MASTER) 
    val sc = new SparkContext(sConf)
    val sqlContext = new SQLContext(sc)

    val inputDataRdd = sc.parallelize(Seq((166,"AAA"),(173,"A"),(180,"A"),(184,"AAA"),(185,999),(186,"AAA"),(1,"AAA"),(2,"AAA"),(33,999),(83,"27-11-2015"),(146,"AAA"),(155,"AA"),(160,"AA")))
      .map(tuple => {
      val row = Seq(tuple._1, String.valueOf(tuple._2)) // force String type for data
      Row.fromSeq(row)
    })
    // inputDataRdd is RDD[Row]

    val schema = {
      val srcFields = List("f_int", "f_any").zipWithIndex.map(field => StructField(
        field._1,
        field._2 match {
          case 0 => IntegerType
          case 1 => StringType
        },
        nullable = false))
      StructType(srcFields.toArray)
    }
    val dataframe = sqlContext.createDataFrame(inputDataRdd, schema)
    dataframe.registerTempTable("input_dataframe")

    val sorted = sqlContext.sql("select f_any from input_dataframe order by f_int")
    sorted.show()
/*
+----------+
|     f_any|
+----------+
|       AAA
|       AAA
|       999
|27-11-2015
|       AAA
|        AA
|        AA
|       AAA
|         A
|         A
|       AAA
|       99
|       AAA
+----------+
*/

    val res = sorted.rdd.reduce((row1, row2) => Row.fromSeq(Seq(row1(0) + "," + row2(0)))) // res is a Row

    val resRdd = sc.parallelize(res.toSeq) // resRdd is RDD[Any]

    resRdd.saveAsTextFile(HDFS_NAMENODE + HDFS_WORK_PATH + "/test_output.csv")

The data frame content is well sorted, until I reduce it to a single row...

 

Here is the HDFS output content, it may vary on each run:

999,AAA,AAA,AAA,A,AA,AA,AAA,AAA,A,999,27-11-2015,AAA

Another case that troubles me is the following. Let's imagine I try to update the String of each RDD row, I would like to set a code defined by a sequence. For example, "myrow_1", "myrow_2", "myrow_3", ...

To do that I will use an integer that will be incremented each time it will be used.

Such data manipulation would succeed, using RDD.map(), but I may have same number used many times as each worker will increase its own integer value... Isn't it possible, again, to say Spark engine that - for this map operation only - I need not to distribute the job?

 

Thanks for your feedback 🙂

1 ACCEPTED SOLUTION

avatar
Visitor

Use `.coalesce(1, shuffle = true)` to repartition before sorting and reducing. Failing that, use an `aggregate` function that inserts the sorted data at the appropriate point in the list.

View solution in original post

4 REPLIES 4

avatar
Visitor

Use `.coalesce(1, shuffle = true)` to repartition before sorting and reducing. Failing that, use an `aggregate` function that inserts the sorted data at the appropriate point in the list.

avatar
Visitor
For your second question have you tried `zipWithIndex`?

avatar
Contributor

Hi Srita and thanks for your answers 🙂

 

For the first one, I cannot use the coalesce tip before sorting as I just need to concatenate strings with a defined order. I will look at the aggregate function advise.

 

Here is a simple Spark sample of code to reproduce the issue easily, without Spark-Sql stuff:

    // Input data: (166,"A"),(2,"B"),(200,"C"),(100,"D")
    // Expected output is : "B","D","A","C" (String values concatenated using order defined by Int values)

    val data = sc.parallelize(Seq((166,"A"),(2,"B"),(200,"C"),(100,"D")))
    // RDD[(Int, String)]

    val keyedSortedData = data.keyBy(x => x._1).sortByKey()
    // RDD[(Int, (Int, String))]

    val sortedData = keyedSortedData.map({case (a,(b,c)) => c})
    // RDD[String]

    val reducedData = sortedData.reduce((valueA, valueB) => valueA + "," + valueB).toString
    // String in CSV format

    println(reducedData)
    // Expected: B,D,A,C
    // Execution 1 - Result is: A,D,C,B
    // Execution 2 - Result is: A,C,D,B
    // Execution 3 - Result is: C,B,A,D
    // ...

 

For the second one, I have been advised to use Spark accumulators. The zipWithIndex method seems to be a good alternative.

 

avatar
Contributor

Here is the answer, thanks again Srita.

 

/**
* Return a new RDD that has exactly numPartitions partitions.
*
* Can increase or decrease the level of parallelism in this RDD. Internally, this uses
* a shuffle to redistribute data.
*
* If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
* which can avoid performing a shuffle.
*/
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null😞 RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}

 

  /**
   * Return a new RDD that is reduced into `numPartitions` partitions.
   *
   * This results in a narrow dependency, e.g. if you go from 1000 partitions
   * to 100 partitions, there will not be a shuffle, instead each of the 100
   * new partitions will claim 10 of the current partitions.
   *
   * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
   * this may result in your computation taking place on fewer nodes than
   * you like (e.g. one node in the case of numPartitions = 1). To avoid this,
   * you can pass shuffle = true. This will add a shuffle step, but means the
   * current upstream partitions will be executed in parallel (per whatever
   * the current partitioning is).
   *
   * Note: With shuffle = true, you can actually coalesce to a larger number
   * of partitions. This is useful if you have a small number of partitions,
   * say 100, potentially with a few partitions being abnormally large. Calling
   * coalesce(1000, shuffle = true) will result in 1000 partitions with the
   * data distributed using a hash partitioner.
   */
  def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null)
      : RDD[T] = withScope {
...
}

Application source code:

    val data = sc.parallelize(Seq((166,"A"),(2,"B"),(200,"C"),(100,"D")))
    // RDD[(Int, String)]

    val keyedSortedData = data.keyBy(x => x._1).sortByKey()
    // RDD[(Int, (Int, String))]

    val sortedData = keyedSortedData.map({case (a,(b,c)) => c})
    // RDD[String]

    val reducedData = sortedData.coalesce(1, shuffle = true).reduce((valueA, valueB) => valueA + "," + valueB).toString
    // String in CSV format
    // also works using coalesce(1, shuffle = false) or partition(1)

    println(reducedData)