Member since
05-17-2016
190
Posts
46
Kudos Received
11
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
1387 | 09-07-2017 06:24 PM | |
1790 | 02-24-2017 06:33 AM | |
2575 | 02-10-2017 09:18 PM | |
7066 | 01-11-2017 08:55 PM | |
4707 | 12-15-2016 06:16 PM |
07-28-2016
06:16 PM
1 Kudo
wholeTextFiles was a nice approach, I have used the below approach after finding the issue with databricks library. val files = sc.newAPIHadoopRDD(conf, classOf[TextInputFormat], classOf[LongWritable], classOf[Text])
val headerLessRDD = files.filter(f => f._1.get!=0).values.map { x => Row.fromSeq(x.toString().split(",")) }
val header = files.filter(f => f._1.get==0).first()._2.toString()
val schema = StructType(header.split(",").map(fieldName => StructField(fieldName, StringType, true)))
val dataFrame =sqlContext.createDataFrame(headerLessRDD, schema) Basic idea was to read the file using TextInputFormat and skip the line if the start offset is 0
... View more
07-28-2016
01:26 PM
You can apply a filter or bincond operator on any column(s) of your relation X. You can get more details on available operators here
... View more
07-27-2016
07:13 PM
2 Kudos
You can use org.apache.pig.piggybank.evaluation.IsNumeric Some thing like
X = foreach Y generate ((org.apache.pig.piggybank.evaluation.IsNumeric($1)==true)?(int)$1:null) I have applied generate for one column, you can add rest of columns
... View more
07-27-2016
01:17 PM
Thanks for the update @Mukesh Kumar. Is it worth doing a 1-1 write or do you want to explore the BulkLoad option in Cassandra?
... View more
07-26-2016
07:07 PM
Just wanted to know if you have dynamic execution enabled and if so, what are the values for initial and max executors. Also could I ask, how many node, core per node and RAM per node.
... View more
07-26-2016
06:49 PM
2 Kudos
Thanks
@Mukesh Kumar for adding your answer.
However, I would like to add a note on the general aspects that can be looked up on for improving the performance. As you rightly said, Parallelism is one important aspect. We can adjust parallelism in code by increasing or decreasing the partitions - case where you have lot free cores to run your application and the number of partitions are comparatively less, you could increase the number of partitions and case where you derive an RDD from an existing RDD and you get a very small fraction of the parent RDD, you may not get any benefit in the child rdd having the same number of partitions as its parent, you can try reducing the number of partitions Other aspects that you can consider are -
Data Locality This refer to how near the data and code are. There could be situations where there are no CPU cycles to start a task on local – spark can decide to
WAIT - data movement not required Move over to a free CPU and start the task there – Data need to be moved The wait time for CPU can be configured setting spark.locality.wait* properties. Based on the application, we can decide if waiting for more time saves us time compared to the data being shuffled across over the network. Data Serialization There are situations where framework may require to ship the data over the network or persist it.
In such scenarios, the objects are serialized. Java Serialization is used by default. However, serialization frameworks
like kryo has shown better results than the default serialization. The serialization can be set using spark.serializer Memory Management Another important aspect that you already stepped up on is Memory Management.
If the application
would require not to Persist/Cache the data, you could try reducing the Storage Memory fraction and increasing the
execution memory.
This is with the fact that the Execution Memory can evict the storage memory up to the configured threshold for Storage and the
reverse is not true.
We can adjust these memory value changing spark.memory.fraction and spark.memory.storageFraction Also on a side note, all Java GC tuning methods could be applied to Spark Applications as well.
We can collect GC statistics using java options -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps Also note that serializing helps reduce the GC overhead of fighting large number of smaller objects. Finally, we could most importantly look at our code, some points to consider could be - Carry forward only data that is worth
ie. we could consider filtering, schema validation on structured datasets etc upfront before propogating them to the downstream logics/aggregations Know the data size - think before calling collect, use sample/subset for debug/testing Target the low hanging fruits - consider reduceByKey over groupByKey, this helps avoid lot data shuffle over the network Consider using broadcast variables for caching large read only variables
... View more
07-20-2016
01:32 PM
Once you start the sandbox, the url would be displayed within your VM. For the online shell, try http://localhost:4200/
For detailed instructions, refer http://hortonworks.com/hadoop-tutorial/learning-the-ropes-of-the-hortonworks-sandbox/
... View more
07-20-2016
01:12 PM
I thought you had an option to write the file to HDFS using a spark application. Hence assumed you had a list of Vector with which you can make an rdd calling the parallelize. val sc = new SparkContext("local[1]","Sample App")
val v1: Vector = Vectors.dense(2.0,3.0,4.0)
val v2: Vector = Vectors.dense(5.0, 6.0, 7.0)
val list = new ListBuffer[Vector]()
list += v1
list += v2
val listRdd = sc.parallelize(list)
listRdd.saveAsObjectFile("localFile")
// read it back to an RDD as vector in another application
val fileRdd = sc.objectFile[Vector]("localFile")
These methods are available in JavaSparkContext and JavaRDD
... View more
07-19-2016
06:37 PM
Try storing the RDD to disk using saveAsObjectFile and you can retrieve back the same using objectFile() vectorListRdd.saveAsObjectFile("<path>")
val fileRdd = sc.objectFile[Vector]("<path>")
... View more
07-18-2016
01:45 PM
If spark-csv_2.10-1.4.0.jar is your application, please submit it using spark-submit rather than running it as java application. Could you explain little more on what the application is doing? What is the data source? How do you turn your data into a data frame etc...
... View more