Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Spark: force map/reduce no to be distributed

avatar
Contributor

Hello,

 

I have a simple Spark RDD[(Int, String)].

The Int is a position, the String is a value. My goal is to produce a simple String from this RDD by concatenating all values. The difficulty is that I need to keep the value position defined by the Int 🙂

 

Here is a my set of values:

val inputDataRdd = sc.parallelize(Seq((166,"AAA"),(173,"A"),(180,"A"),(184,"AAA"),(185,999),(186,"AAA"),(1,"AAA"),(2,"AAA"),(33,999),(83,"27-11-2015"),(146,"AAA"),(155,"AA"),(160,"AA")))

Here is my expected String:

"AAA","AAA",999"27-11-2015","AAA","AAA","AA","AAA","A","A","AAA",999,"AAA"

What respects the order defined by the Int key:

1,2,33,83,146,155,160,166,173,180,184,185,186

I'm able to define a new Spark RDD using these inputs.

I can convert it to a pair-RDD with the int a the key (position) and the String as value. 

Then I can easily sort it.

When I try to extract RDD values and to concatenate it as a string (reduce), I loose my order...

 

This behavior seems pretty normal as the data manipulation operations (map/reduce/foreach) are distributed throught the Spark cluster.

 

Is there a way to force a reduce operation not to be distributed ?

 

Here is another try, using Spark SQL, but I get same issue:

package scala

import java.text.SimpleDateFormat
import java.util.Calendar

import com.typesafe.config.Config
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileUtil, FileSystem}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.types.{ StringType, IntegerType, StructField, StructType }


import scala.collection.mutable.ListBuffer

object generator {
  def main(args: Array[String]) {

    val SPARK_MASTER = "local[*]" // "local[*]" - "spark://...:7077"
    val HDFS_NAMENODE = "hdfs://...:8020"
    val HDFS_WORK_PATH = "/user/..."

    // Initialize the environment - Hadoop
    val hadoopConf = new Configuration()
    hadoopConf.set("fs.default.name", HDFS_NAMENODE)
    val hdfs = FileSystem.get(hadoopConf)

    // Initialize the environment - Spark
    val sConf = new SparkConf().setAppName("test")
    sConf.setMaster(SPARK_MASTER) 
    val sc = new SparkContext(sConf)
    val sqlContext = new SQLContext(sc)

    val inputDataRdd = sc.parallelize(Seq((166,"AAA"),(173,"A"),(180,"A"),(184,"AAA"),(185,999),(186,"AAA"),(1,"AAA"),(2,"AAA"),(33,999),(83,"27-11-2015"),(146,"AAA"),(155,"AA"),(160,"AA")))
      .map(tuple => {
      val row = Seq(tuple._1, String.valueOf(tuple._2)) // force String type for data
      Row.fromSeq(row)
    })
    // inputDataRdd is RDD[Row]

    val schema = {
      val srcFields = List("f_int", "f_any").zipWithIndex.map(field => StructField(
        field._1,
        field._2 match {
          case 0 => IntegerType
          case 1 => StringType
        },
        nullable = false))
      StructType(srcFields.toArray)
    }
    val dataframe = sqlContext.createDataFrame(inputDataRdd, schema)
    dataframe.registerTempTable("input_dataframe")

    val sorted = sqlContext.sql("select f_any from input_dataframe order by f_int")
    sorted.show()
/*
+----------+
|     f_any|
+----------+
|       AAA
|       AAA
|       999
|27-11-2015
|       AAA
|        AA
|        AA
|       AAA
|         A
|         A
|       AAA
|       99
|       AAA
+----------+
*/

    val res = sorted.rdd.reduce((row1, row2) => Row.fromSeq(Seq(row1(0) + "," + row2(0)))) // res is a Row

    val resRdd = sc.parallelize(res.toSeq) // resRdd is RDD[Any]

    resRdd.saveAsTextFile(HDFS_NAMENODE + HDFS_WORK_PATH + "/test_output.csv")

The data frame content is well sorted, until I reduce it to a single row...

 

Here is the HDFS output content, it may vary on each run:

999,AAA,AAA,AAA,A,AA,AA,AAA,AAA,A,999,27-11-2015,AAA

Another case that troubles me is the following. Let's imagine I try to update the String of each RDD row, I would like to set a code defined by a sequence. For example, "myrow_1", "myrow_2", "myrow_3", ...

To do that I will use an integer that will be incremented each time it will be used.

Such data manipulation would succeed, using RDD.map(), but I may have same number used many times as each worker will increase its own integer value... Isn't it possible, again, to say Spark engine that - for this map operation only - I need not to distribute the job?

 

Thanks for your feedback 🙂

1 ACCEPTED SOLUTION

avatar
New Contributor

Use `.coalesce(1, shuffle = true)` to repartition before sorting and reducing. Failing that, use an `aggregate` function that inserts the sorted data at the appropriate point in the list.

View solution in original post

4 REPLIES 4

avatar
New Contributor

Use `.coalesce(1, shuffle = true)` to repartition before sorting and reducing. Failing that, use an `aggregate` function that inserts the sorted data at the appropriate point in the list.

avatar
New Contributor
For your second question have you tried `zipWithIndex`?

avatar
Contributor

Hi Srita and thanks for your answers 🙂

 

For the first one, I cannot use the coalesce tip before sorting as I just need to concatenate strings with a defined order. I will look at the aggregate function advise.

 

Here is a simple Spark sample of code to reproduce the issue easily, without Spark-Sql stuff:

    // Input data: (166,"A"),(2,"B"),(200,"C"),(100,"D")
    // Expected output is : "B","D","A","C" (String values concatenated using order defined by Int values)

    val data = sc.parallelize(Seq((166,"A"),(2,"B"),(200,"C"),(100,"D")))
    // RDD[(Int, String)]

    val keyedSortedData = data.keyBy(x => x._1).sortByKey()
    // RDD[(Int, (Int, String))]

    val sortedData = keyedSortedData.map({case (a,(b,c)) => c})
    // RDD[String]

    val reducedData = sortedData.reduce((valueA, valueB) => valueA + "," + valueB).toString
    // String in CSV format

    println(reducedData)
    // Expected: B,D,A,C
    // Execution 1 - Result is: A,D,C,B
    // Execution 2 - Result is: A,C,D,B
    // Execution 3 - Result is: C,B,A,D
    // ...

 

For the second one, I have been advised to use Spark accumulators. The zipWithIndex method seems to be a good alternative.

 

avatar
Contributor

Here is the answer, thanks again Srita.

 

/**
* Return a new RDD that has exactly numPartitions partitions.
*
* Can increase or decrease the level of parallelism in this RDD. Internally, this uses
* a shuffle to redistribute data.
*
* If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
* which can avoid performing a shuffle.
*/
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null😞 RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}

 

  /**
   * Return a new RDD that is reduced into `numPartitions` partitions.
   *
   * This results in a narrow dependency, e.g. if you go from 1000 partitions
   * to 100 partitions, there will not be a shuffle, instead each of the 100
   * new partitions will claim 10 of the current partitions.
   *
   * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
   * this may result in your computation taking place on fewer nodes than
   * you like (e.g. one node in the case of numPartitions = 1). To avoid this,
   * you can pass shuffle = true. This will add a shuffle step, but means the
   * current upstream partitions will be executed in parallel (per whatever
   * the current partitioning is).
   *
   * Note: With shuffle = true, you can actually coalesce to a larger number
   * of partitions. This is useful if you have a small number of partitions,
   * say 100, potentially with a few partitions being abnormally large. Calling
   * coalesce(1000, shuffle = true) will result in 1000 partitions with the
   * data distributed using a hash partitioner.
   */
  def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null)
      : RDD[T] = withScope {
...
}

Application source code:

    val data = sc.parallelize(Seq((166,"A"),(2,"B"),(200,"C"),(100,"D")))
    // RDD[(Int, String)]

    val keyedSortedData = data.keyBy(x => x._1).sortByKey()
    // RDD[(Int, (Int, String))]

    val sortedData = keyedSortedData.map({case (a,(b,c)) => c})
    // RDD[String]

    val reducedData = sortedData.coalesce(1, shuffle = true).reduce((valueA, valueB) => valueA + "," + valueB).toString
    // String in CSV format
    // also works using coalesce(1, shuffle = false) or partition(1)

    println(reducedData)