Member since
08-14-2015
24
Posts
2
Kudos Received
4
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
16802 | 09-01-2015 12:19 AM | |
5328 | 09-01-2015 12:04 AM | |
87317 | 08-14-2015 07:45 AM | |
14738 | 08-14-2015 07:42 AM |
12-16-2015
04:55 AM
Here is the answer, thanks again Srita. /** * Return a new RDD that has exactly numPartitions partitions. * * Can increase or decrease the level of parallelism in this RDD. Internally, this uses * a shuffle to redistribute data. * * If you are decreasing the number of partitions in this RDD, consider using `coalesce`, * which can avoid performing a shuffle. */ def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { coalesce(numPartitions, shuffle = true) } /**
* Return a new RDD that is reduced into `numPartitions` partitions.
*
* This results in a narrow dependency, e.g. if you go from 1000 partitions
* to 100 partitions, there will not be a shuffle, instead each of the 100
* new partitions will claim 10 of the current partitions.
*
* However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
* this may result in your computation taking place on fewer nodes than
* you like (e.g. one node in the case of numPartitions = 1). To avoid this,
* you can pass shuffle = true. This will add a shuffle step, but means the
* current upstream partitions will be executed in parallel (per whatever
* the current partitioning is).
*
* Note: With shuffle = true, you can actually coalesce to a larger number
* of partitions. This is useful if you have a small number of partitions,
* say 100, potentially with a few partitions being abnormally large. Calling
* coalesce(1000, shuffle = true) will result in 1000 partitions with the
* data distributed using a hash partitioner.
*/
def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null)
: RDD[T] = withScope {
...
} Application source code: val data = sc.parallelize(Seq((166,"A"),(2,"B"),(200,"C"),(100,"D")))
// RDD[(Int, String)]
val keyedSortedData = data.keyBy(x => x._1).sortByKey()
// RDD[(Int, (Int, String))]
val sortedData = keyedSortedData.map({case (a,(b,c)) => c})
// RDD[String]
val reducedData = sortedData.coalesce(1, shuffle = true).reduce((valueA, valueB) => valueA + "," + valueB).toString
// String in CSV format
// also works using coalesce(1, shuffle = false) or partition(1)
println(reducedData)
... View more
12-14-2015
09:04 AM
Hi Srita and thanks for your answers 🙂 For the first one, I cannot use the coalesce tip before sorting as I just need to concatenate strings with a defined order. I will look at the aggregate function advise. Here is a simple Spark sample of code to reproduce the issue easily, without Spark-Sql stuff: // Input data: (166,"A"),(2,"B"),(200,"C"),(100,"D")
// Expected output is : "B","D","A","C" (String values concatenated using order defined by Int values)
val data = sc.parallelize(Seq((166,"A"),(2,"B"),(200,"C"),(100,"D")))
// RDD[(Int, String)]
val keyedSortedData = data.keyBy(x => x._1).sortByKey()
// RDD[(Int, (Int, String))]
val sortedData = keyedSortedData.map({case (a,(b,c)) => c})
// RDD[String]
val reducedData = sortedData.reduce((valueA, valueB) => valueA + "," + valueB).toString
// String in CSV format
println(reducedData)
// Expected: B,D,A,C
// Execution 1 - Result is: A,D,C,B
// Execution 2 - Result is: A,C,D,B
// Execution 3 - Result is: C,B,A,D
// ... For the second one, I have been advised to use Spark accumulators. The zipWithIndex method seems to be a good alternative.
... View more
11-27-2015
07:50 AM
Hello, I have a simple Spark RDD[(Int, String)]. The Int is a position, the String is a value. My goal is to produce a simple String from this RDD by concatenating all values. The difficulty is that I need to keep the value position defined by the Int 🙂 Here is a my set of values: val inputDataRdd = sc.parallelize(Seq((166,"AAA"),(173,"A"),(180,"A"),(184,"AAA"),(185,999),(186,"AAA"),(1,"AAA"),(2,"AAA"),(33,999),(83,"27-11-2015"),(146,"AAA"),(155,"AA"),(160,"AA"))) Here is my expected String: "AAA","AAA",999"27-11-2015","AAA","AAA","AA","AAA","A","A","AAA",999,"AAA" What respects the order defined by the Int key: 1,2,33,83,146,155,160,166,173,180,184,185,186 I'm able to define a new Spark RDD using these inputs. I can convert it to a pair-RDD with the int a the key (position) and the String as value. Then I can easily sort it. When I try to extract RDD values and to concatenate it as a string (reduce), I loose my order... This behavior seems pretty normal as the data manipulation operations (map/reduce/foreach) are distributed throught the Spark cluster. Is there a way to force a reduce operation not to be distributed ? Here is another try, using Spark SQL, but I get same issue: package scala
import java.text.SimpleDateFormat
import java.util.Calendar
import com.typesafe.config.Config
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileUtil, FileSystem}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.types.{ StringType, IntegerType, StructField, StructType }
import scala.collection.mutable.ListBuffer
object generator {
def main(args: Array[String]) {
val SPARK_MASTER = "local[*]" // "local[*]" - "spark://...:7077"
val HDFS_NAMENODE = "hdfs://...:8020"
val HDFS_WORK_PATH = "/user/..."
// Initialize the environment - Hadoop
val hadoopConf = new Configuration()
hadoopConf.set("fs.default.name", HDFS_NAMENODE)
val hdfs = FileSystem.get(hadoopConf)
// Initialize the environment - Spark
val sConf = new SparkConf().setAppName("test")
sConf.setMaster(SPARK_MASTER)
val sc = new SparkContext(sConf)
val sqlContext = new SQLContext(sc)
val inputDataRdd = sc.parallelize(Seq((166,"AAA"),(173,"A"),(180,"A"),(184,"AAA"),(185,999),(186,"AAA"),(1,"AAA"),(2,"AAA"),(33,999),(83,"27-11-2015"),(146,"AAA"),(155,"AA"),(160,"AA")))
.map(tuple => {
val row = Seq(tuple._1, String.valueOf(tuple._2)) // force String type for data
Row.fromSeq(row)
})
// inputDataRdd is RDD[Row]
val schema = {
val srcFields = List("f_int", "f_any").zipWithIndex.map(field => StructField(
field._1,
field._2 match {
case 0 => IntegerType
case 1 => StringType
},
nullable = false))
StructType(srcFields.toArray)
}
val dataframe = sqlContext.createDataFrame(inputDataRdd, schema)
dataframe.registerTempTable("input_dataframe")
val sorted = sqlContext.sql("select f_any from input_dataframe order by f_int")
sorted.show()
/*
+----------+
| f_any|
+----------+
| AAA
| AAA
| 999
|27-11-2015
| AAA
| AA
| AA
| AAA
| A
| A
| AAA
| 99
| AAA
+----------+
*/
val res = sorted.rdd.reduce((row1, row2) => Row.fromSeq(Seq(row1(0) + "," + row2(0)))) // res is a Row
val resRdd = sc.parallelize(res.toSeq) // resRdd is RDD[Any]
resRdd.saveAsTextFile(HDFS_NAMENODE + HDFS_WORK_PATH + "/test_output.csv") The data frame content is well sorted, until I reduce it to a single row... Here is the HDFS output content, it may vary on each run: 999,AAA,AAA,AAA,A,AA,AA,AAA,AAA,A,999,27-11-2015,AAA Another case that troubles me is the following. Let's imagine I try to update the String of each RDD row, I would like to set a code defined by a sequence. For example, "myrow_1", "myrow_2", "myrow_3", ... To do that I will use an integer that will be incremented each time it will be used. Such data manipulation would succeed, using RDD.map(), but I may have same number used many times as each worker will increase its own integer value... Isn't it possible, again, to say Spark engine that - for this map operation only - I need not to distribute the job? Thanks for your feedback 🙂
... View more
Labels:
- Labels:
-
Apache Hadoop
-
Apache Spark
-
HDFS
09-19-2015
05:26 AM
Hello, When writing a file to HDFS, from a Spark application in Scala, I cannot find a way to limit the HDFS resources to be used. I know I can use an Hadoop confifuration for my Hadoop FileSystem object, that will be used for data manipulation such as deleting a file. Is there a way to say it that, even if I have 3 datanodes and even if each writen file should be distributed to at least 2 partitions, I would like to enforce it to be qplitted and distributed on 3 partitions and datanodes? I would like to be able to do this programmatically, and not to configure tha Hadoop cluster and restart it... What would impact all Spark applications. Thanks in advance for your feedback 🙂
... View more
Labels:
- Labels:
-
Apache Hadoop
-
Apache Spark
-
HDFS
09-01-2015
12:19 AM
I made a choice: Spark-JobServer. This project is almost done exactly in response to my needs, it allows to share RRD between applications as it shares a context. It supports Spark Sql/Hive contexts. And it is fully working without the need to install a new component on all cluster nodes 🙂
... View more
09-01-2015
12:04 AM
I would summarize saying that one may use SparkSql (or Hive) in order to write SQL queries with complex joining. Else, with Spark, one is able and must describe the execution plan, so he has to write each join separately.
... View more
08-26-2015
04:25 AM
Hello, Thanks for your reply, this is a very interesting functionality you have pointed out! I will have a look at this and check if it also works for complex joins (like outer jons). Greg.
... View more
08-20-2015
08:16 AM
Hello, I still need to dig this but I will also check the MapFiles that are some Indexed SequenceFiles. I'll provide me feedback then 🙂 Greg.
... View more
08-20-2015
08:15 AM
Hello there 🙂 I tested HDFS performances and I admit it may be sufficient for my needs, thanks for the lead! Moreover, as Kafkya and Tachyon integrations are still experimental and as this is some big stuff, I searched something else and found the spark-jobserver project that may exactly be what I need: a server Spark application opens the SparkContext and manages RDD for client Spark applications. It may do the stuff, I'll look at this. https://github.com/spark-jobserver/spark-jobserver
... View more
08-19-2015
12:18 AM
Thanks for your suggestions and documentation links Wilfred, I will have a look at sequence files today.
... View more