<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>question Spark: force map/reduce no to be distributed in Archives of Support Questions (Read Only)</title>
    <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Spark-force-map-reduce-no-to-be-distributed/m-p/34506#M11116</link>
    <description>&lt;P&gt;Hello,&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;I have a simple Spark RDD[(Int, String)].&lt;/P&gt;&lt;P&gt;The Int is a position, the String is a value. My goal is to produce a simple String from this RDD by concatenating all values. The difficulty is that I need to keep the value position defined by the Int &lt;span class="lia-unicode-emoji" title=":slightly_smiling_face:"&gt;🙂&lt;/span&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Here is a my set of values:&lt;/P&gt;&lt;PRE&gt;val inputDataRdd = sc.parallelize(Seq((166,"AAA"),(173,"A"),(180,"A"),(184,"AAA"),(185,999),(186,"AAA"),(1,"AAA"),(2,"AAA"),(33,999),(83,"27-11-2015"),(146,"AAA"),(155,"AA"),(160,"AA")))&lt;/PRE&gt;&lt;P&gt;Here is my expected String:&lt;/P&gt;&lt;PRE&gt;"AAA","AAA",999"27-11-2015","AAA","AAA","AA","AAA","A","A","AAA",999,"AAA"&lt;/PRE&gt;&lt;P&gt;What respects the order defined by the Int key:&lt;/P&gt;&lt;PRE&gt;1,2,33,83,146,155,160,166,173,180,184,185,186&lt;/PRE&gt;&lt;P&gt;I'm able to define a new Spark RDD using these inputs.&lt;/P&gt;&lt;P&gt;I can convert it to a pair-RDD with the int a the key (position) and the String as value.&amp;nbsp;&lt;/P&gt;&lt;P&gt;Then I can easily sort it.&lt;/P&gt;&lt;P&gt;When I try to extract RDD values and to concatenate it as a string (reduce), I loose my order...&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;This behavior seems pretty normal as the data manipulation operations (map/reduce/foreach) are distributed throught the Spark cluster.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;STRONG&gt;Is there a way to force a reduce operation not to be distributed ?&lt;/STRONG&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Here is another try, using Spark SQL, but I get same issue:&lt;/P&gt;&lt;PRE&gt;package scala

import java.text.SimpleDateFormat
import java.util.Calendar

import com.typesafe.config.Config
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileUtil, FileSystem}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.types.{ StringType, IntegerType, StructField, StructType }


import scala.collection.mutable.ListBuffer

object generator {
  def main(args: Array[String]) {

    val SPARK_MASTER = "local[*]" // "local[*]" - "spark://...:7077"
    val HDFS_NAMENODE = "hdfs://...:8020"
    val HDFS_WORK_PATH = "/user/..."

    // Initialize the environment - Hadoop
    val hadoopConf = new Configuration()
    hadoopConf.set("fs.default.name", HDFS_NAMENODE)
    val hdfs = FileSystem.get(hadoopConf)

    // Initialize the environment - Spark
    val sConf = new SparkConf().setAppName("test")
    sConf.setMaster(SPARK_MASTER) 
    val sc = new SparkContext(sConf)
    val sqlContext = new SQLContext(sc)

    val inputDataRdd = sc.parallelize(Seq((166,"AAA"),(173,"A"),(180,"A"),(184,"AAA"),(185,999),(186,"AAA"),(1,"AAA"),(2,"AAA"),(33,999),(83,"27-11-2015"),(146,"AAA"),(155,"AA"),(160,"AA")))
      .map(tuple =&amp;gt; {
      val row = Seq(tuple._1, String.valueOf(tuple._2)) // force String type for data
      Row.fromSeq(row)
    })
    // inputDataRdd is RDD[Row]

    val schema = {
      val srcFields = List("f_int", "f_any").zipWithIndex.map(field =&amp;gt; StructField(
        field._1,
        field._2 match {
          case 0 =&amp;gt; IntegerType
          case 1 =&amp;gt; StringType
        },
        nullable = false))
      StructType(srcFields.toArray)
    }
    val dataframe = sqlContext.createDataFrame(inputDataRdd, schema)
    dataframe.registerTempTable("input_dataframe")

    val sorted = sqlContext.sql("select f_any from input_dataframe order by f_int")
    sorted.show()
/*
+----------+
|     f_any|
+----------+
|       AAA
|       AAA
|       999
|27-11-2015
|       AAA
|        AA
|        AA
|       AAA
|         A
|         A
|       AAA
|       99
|       AAA
+----------+
*/

    val res = sorted.rdd.reduce((row1, row2) =&amp;gt; Row.fromSeq(Seq(row1(0) + "," + row2(0)))) // res is a Row

    val resRdd = sc.parallelize(res.toSeq) // resRdd is RDD[Any]

    resRdd.saveAsTextFile(HDFS_NAMENODE + HDFS_WORK_PATH + "/test_output.csv")&lt;/PRE&gt;&lt;P&gt;The data frame content is well sorted, until I reduce it to a single row...&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Here is the HDFS output content, it may vary on each run:&lt;/P&gt;&lt;PRE&gt;999,AAA,AAA,AAA,A,AA,AA,AAA,AAA,A,999,27-11-2015,AAA&lt;/PRE&gt;&lt;P&gt;Another case that troubles me is the following. Let's imagine I&amp;nbsp;try to update the String of each RDD row, I would like to set a code defined by a sequence. For example, "myrow_1", "myrow_2", "myrow_3", ...&lt;/P&gt;&lt;P&gt;To do that I will use an integer that will be incremented each time it will be used.&lt;/P&gt;&lt;P&gt;Such data manipulation would succeed, using RDD.map(), but I may have same number used many times as each worker will&amp;nbsp;increase its own integer value... Isn't it possible, again, to say Spark engine that - for this map operation only - I need not to distribute the job?&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Thanks for your feedback &lt;span class="lia-unicode-emoji" title=":slightly_smiling_face:"&gt;🙂&lt;/span&gt;&lt;/P&gt;</description>
    <pubDate>Fri, 16 Sep 2022 09:50:39 GMT</pubDate>
    <dc:creator>Grg</dc:creator>
    <dc:date>2022-09-16T09:50:39Z</dc:date>
    <item>
      <title>Spark: force map/reduce no to be distributed</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Spark-force-map-reduce-no-to-be-distributed/m-p/34506#M11116</link>
      <description>&lt;P&gt;Hello,&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;I have a simple Spark RDD[(Int, String)].&lt;/P&gt;&lt;P&gt;The Int is a position, the String is a value. My goal is to produce a simple String from this RDD by concatenating all values. The difficulty is that I need to keep the value position defined by the Int &lt;span class="lia-unicode-emoji" title=":slightly_smiling_face:"&gt;🙂&lt;/span&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Here is a my set of values:&lt;/P&gt;&lt;PRE&gt;val inputDataRdd = sc.parallelize(Seq((166,"AAA"),(173,"A"),(180,"A"),(184,"AAA"),(185,999),(186,"AAA"),(1,"AAA"),(2,"AAA"),(33,999),(83,"27-11-2015"),(146,"AAA"),(155,"AA"),(160,"AA")))&lt;/PRE&gt;&lt;P&gt;Here is my expected String:&lt;/P&gt;&lt;PRE&gt;"AAA","AAA",999"27-11-2015","AAA","AAA","AA","AAA","A","A","AAA",999,"AAA"&lt;/PRE&gt;&lt;P&gt;What respects the order defined by the Int key:&lt;/P&gt;&lt;PRE&gt;1,2,33,83,146,155,160,166,173,180,184,185,186&lt;/PRE&gt;&lt;P&gt;I'm able to define a new Spark RDD using these inputs.&lt;/P&gt;&lt;P&gt;I can convert it to a pair-RDD with the int a the key (position) and the String as value.&amp;nbsp;&lt;/P&gt;&lt;P&gt;Then I can easily sort it.&lt;/P&gt;&lt;P&gt;When I try to extract RDD values and to concatenate it as a string (reduce), I loose my order...&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;This behavior seems pretty normal as the data manipulation operations (map/reduce/foreach) are distributed throught the Spark cluster.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;STRONG&gt;Is there a way to force a reduce operation not to be distributed ?&lt;/STRONG&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Here is another try, using Spark SQL, but I get same issue:&lt;/P&gt;&lt;PRE&gt;package scala

import java.text.SimpleDateFormat
import java.util.Calendar

import com.typesafe.config.Config
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileUtil, FileSystem}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.types.{ StringType, IntegerType, StructField, StructType }


import scala.collection.mutable.ListBuffer

object generator {
  def main(args: Array[String]) {

    val SPARK_MASTER = "local[*]" // "local[*]" - "spark://...:7077"
    val HDFS_NAMENODE = "hdfs://...:8020"
    val HDFS_WORK_PATH = "/user/..."

    // Initialize the environment - Hadoop
    val hadoopConf = new Configuration()
    hadoopConf.set("fs.default.name", HDFS_NAMENODE)
    val hdfs = FileSystem.get(hadoopConf)

    // Initialize the environment - Spark
    val sConf = new SparkConf().setAppName("test")
    sConf.setMaster(SPARK_MASTER) 
    val sc = new SparkContext(sConf)
    val sqlContext = new SQLContext(sc)

    val inputDataRdd = sc.parallelize(Seq((166,"AAA"),(173,"A"),(180,"A"),(184,"AAA"),(185,999),(186,"AAA"),(1,"AAA"),(2,"AAA"),(33,999),(83,"27-11-2015"),(146,"AAA"),(155,"AA"),(160,"AA")))
      .map(tuple =&amp;gt; {
      val row = Seq(tuple._1, String.valueOf(tuple._2)) // force String type for data
      Row.fromSeq(row)
    })
    // inputDataRdd is RDD[Row]

    val schema = {
      val srcFields = List("f_int", "f_any").zipWithIndex.map(field =&amp;gt; StructField(
        field._1,
        field._2 match {
          case 0 =&amp;gt; IntegerType
          case 1 =&amp;gt; StringType
        },
        nullable = false))
      StructType(srcFields.toArray)
    }
    val dataframe = sqlContext.createDataFrame(inputDataRdd, schema)
    dataframe.registerTempTable("input_dataframe")

    val sorted = sqlContext.sql("select f_any from input_dataframe order by f_int")
    sorted.show()
/*
+----------+
|     f_any|
+----------+
|       AAA
|       AAA
|       999
|27-11-2015
|       AAA
|        AA
|        AA
|       AAA
|         A
|         A
|       AAA
|       99
|       AAA
+----------+
*/

    val res = sorted.rdd.reduce((row1, row2) =&amp;gt; Row.fromSeq(Seq(row1(0) + "," + row2(0)))) // res is a Row

    val resRdd = sc.parallelize(res.toSeq) // resRdd is RDD[Any]

    resRdd.saveAsTextFile(HDFS_NAMENODE + HDFS_WORK_PATH + "/test_output.csv")&lt;/PRE&gt;&lt;P&gt;The data frame content is well sorted, until I reduce it to a single row...&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Here is the HDFS output content, it may vary on each run:&lt;/P&gt;&lt;PRE&gt;999,AAA,AAA,AAA,A,AA,AA,AAA,AAA,A,999,27-11-2015,AAA&lt;/PRE&gt;&lt;P&gt;Another case that troubles me is the following. Let's imagine I&amp;nbsp;try to update the String of each RDD row, I would like to set a code defined by a sequence. For example, "myrow_1", "myrow_2", "myrow_3", ...&lt;/P&gt;&lt;P&gt;To do that I will use an integer that will be incremented each time it will be used.&lt;/P&gt;&lt;P&gt;Such data manipulation would succeed, using RDD.map(), but I may have same number used many times as each worker will&amp;nbsp;increase its own integer value... Isn't it possible, again, to say Spark engine that - for this map operation only - I need not to distribute the job?&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Thanks for your feedback &lt;span class="lia-unicode-emoji" title=":slightly_smiling_face:"&gt;🙂&lt;/span&gt;&lt;/P&gt;</description>
      <pubDate>Fri, 16 Sep 2022 09:50:39 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Spark-force-map-reduce-no-to-be-distributed/m-p/34506#M11116</guid>
      <dc:creator>Grg</dc:creator>
      <dc:date>2022-09-16T09:50:39Z</dc:date>
    </item>
    <item>
      <title>Re: Spark: force map/reduce no to be distributed</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Spark-force-map-reduce-no-to-be-distributed/m-p/34721#M11117</link>
      <description>&lt;P&gt;Use `.coalesce(1, shuffle = true)` to repartition before sorting and reducing. Failing that, use an `aggregate` function that inserts the sorted data at the appropriate point in the list.&lt;/P&gt;</description>
      <pubDate>Thu, 03 Dec 2015 16:54:27 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Spark-force-map-reduce-no-to-be-distributed/m-p/34721#M11117</guid>
      <dc:creator>srita</dc:creator>
      <dc:date>2015-12-03T16:54:27Z</dc:date>
    </item>
    <item>
      <title>Re: Spark: force map/reduce no to be distributed</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Spark-force-map-reduce-no-to-be-distributed/m-p/34722#M11118</link>
      <description>For your second question have you tried `zipWithIndex`?</description>
      <pubDate>Thu, 03 Dec 2015 16:53:36 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Spark-force-map-reduce-no-to-be-distributed/m-p/34722#M11118</guid>
      <dc:creator>srita</dc:creator>
      <dc:date>2015-12-03T16:53:36Z</dc:date>
    </item>
    <item>
      <title>Re: Spark: force map/reduce no to be distributed</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Spark-force-map-reduce-no-to-be-distributed/m-p/35210#M11119</link>
      <description>&lt;P&gt;Hi Srita and thanks for your answers &lt;span class="lia-unicode-emoji" title=":slightly_smiling_face:"&gt;🙂&lt;/span&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;For the first one, I cannot use the coalesce tip before sorting&amp;nbsp;as&amp;nbsp;I just need to concatenate strings with a defined order. I will look at the aggregate function advise.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Here is a simple Spark sample of code to reproduce the issue easily, without Spark-Sql stuff:&lt;/P&gt;&lt;PRE&gt;    // Input data&amp;amp;colon; (166,"A"),(2,"B"),(200,"C"),(100,"D")
    // Expected output is : "B","D","A","C" (String values concatenated using order defined by Int values)

    val data = sc.parallelize(Seq((166,"A"),(2,"B"),(200,"C"),(100,"D")))
    // RDD[(Int, String)]

    val keyedSortedData = data.keyBy(x =&amp;gt; x._1).sortByKey()
    // RDD[(Int, (Int, String))]

    val sortedData = keyedSortedData.map({case (a,(b,c)) =&amp;gt; c})
    // RDD[String]

    val reducedData = sortedData.reduce((valueA, valueB) =&amp;gt; valueA + "," + valueB).toString
    // String in CSV format

    println(reducedData)
    // Expected: B,D,A,C
    // Execution 1 - Result is: A,D,C,B
    // Execution 2 - Result is: A,C,D,B
    // Execution 3 - Result is: C,B,A,D
    // ...&lt;/PRE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;For the second one, I have been advised to use &lt;A href="http://spark.apache.org/docs/latest/programming-guide.html#accumulators-a-nameaccumlinka" target="_self"&gt;Spark accumulators&lt;/A&gt;.&amp;nbsp;The&amp;nbsp;zipWithIndex method seems to be a good alternative.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Mon, 14 Dec 2015 17:08:41 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Spark-force-map-reduce-no-to-be-distributed/m-p/35210#M11119</guid>
      <dc:creator>Grg</dc:creator>
      <dc:date>2015-12-14T17:08:41Z</dc:date>
    </item>
    <item>
      <title>Re: Spark: force map/reduce no to be distributed</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Spark-force-map-reduce-no-to-be-distributed/m-p/35321#M11120</link>
      <description>&lt;P&gt;Here is the answer, thanks again Srita.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;PRE&gt;&lt;SPAN&gt;/**&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt; * Return a new RDD that has exactly numPartitions partitions.&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt; *&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt; * Can increase or decrease the level of parallelism in this RDD. Internally, this uses&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt; * a shuffle to redistribute data.&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt; *&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt; * If you are decreasing the number of partitions in this RDD, consider using &lt;/SPAN&gt;&lt;SPAN&gt;`&lt;/SPAN&gt;&lt;SPAN&gt;coalesce&lt;/SPAN&gt;&lt;SPAN&gt;`&lt;/SPAN&gt;&lt;SPAN&gt;,&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt; * which can avoid performing a shuffle.&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt; */&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;def &lt;/SPAN&gt;repartition(&lt;SPAN&gt;numPartitions&lt;/SPAN&gt;: Int)(&lt;SPAN&gt;implicit &lt;/SPAN&gt;&lt;SPAN&gt;ord&lt;/SPAN&gt;: &lt;SPAN&gt;Ordering&lt;/SPAN&gt;[&lt;SPAN&gt;T&lt;/SPAN&gt;] = &lt;SPAN&gt;null&lt;/SPAN&gt;&lt;span class="lia-unicode-emoji" title=":disappointed_face:"&gt;😞&lt;/span&gt; RDD[&lt;SPAN&gt;T&lt;/SPAN&gt;] = withScope {&lt;BR /&gt;  coalesce(&lt;SPAN&gt;numPartitions&lt;/SPAN&gt;, &lt;SPAN&gt;shuffle &lt;/SPAN&gt;= &lt;SPAN&gt;true&lt;/SPAN&gt;)&lt;BR /&gt;}&lt;/PRE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;PRE&gt;  /**
   * Return a new RDD that is reduced into `numPartitions` partitions.
   *
   * This results in a narrow dependency, e.g. if you go from 1000 partitions
   * to 100 partitions, there will not be a shuffle, instead each of the 100
   * new partitions will claim 10 of the current partitions.
   *
   * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
   * this may result in your computation taking place on fewer nodes than
   * you like (e.g. one node in the case of numPartitions = 1). To avoid this,
   * you can pass shuffle = true. This will add a shuffle step, but means the
   * current upstream partitions will be executed in parallel (per whatever
   * the current partitioning is).
   *
   * Note: With shuffle = true, you can actually coalesce to a larger number
   * of partitions. This is useful if you have a small number of partitions,
   * say 100, potentially with a few partitions being abnormally large. Calling
   * coalesce(1000, shuffle = true) will result in 1000 partitions with the
   * data distributed using a hash partitioner.
   */
  def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null)
      : RDD[T] = withScope {
...
}&lt;/PRE&gt;&lt;P&gt;Application source code:&lt;/P&gt;&lt;PRE&gt;    val data = sc.parallelize(Seq((166,"A"),(2,"B"),(200,"C"),(100,"D")))
    // RDD[(Int, String)]

    val keyedSortedData = data.keyBy(x =&amp;gt; x._1).sortByKey()
    // RDD[(Int, (Int, String))]

    val sortedData = keyedSortedData.map({case (a,(b,c)) =&amp;gt; c})
    // RDD[String]

    val reducedData = sortedData.coalesce(1, shuffle = true).reduce((valueA, valueB) =&amp;gt; valueA + "," + valueB).toString
    // String in CSV format
    // also works using coalesce(1, shuffle = false) or partition(1)

    println(reducedData)&lt;/PRE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Wed, 16 Dec 2015 12:56:02 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Spark-force-map-reduce-no-to-be-distributed/m-p/35321#M11120</guid>
      <dc:creator>Grg</dc:creator>
      <dc:date>2015-12-16T12:56:02Z</dc:date>
    </item>
  </channel>
</rss>

