Member since
10-12-2015
63
Posts
56
Kudos Received
13
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
24599 | 02-10-2017 12:35 AM | |
1805 | 02-09-2017 11:00 PM | |
1177 | 02-08-2017 04:48 PM | |
2890 | 01-23-2017 03:11 PM | |
4714 | 11-22-2016 07:33 PM |
10-27-2016
06:48 PM
1 Kudo
Sounds like a JDBC connection is in order. There is an api for creating a dataframe from jdbc connection. jdbc(url: String, table: String, predicates: Array[String], connectionProperties:Properties): DataFrame The issue with JDBC is reading data from teradata will be much slower compared to HDFS. Is it possible to run a sqoop job to move data to hdfs prior to starting your spark application?
... View more
10-24-2016
10:40 PM
1 Kudo
@Steevan Rodrigues When doing saveAsText file it takes a parameter for setting codec to compress with: rdd.saveAsTextFile(filename,compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec")
... View more
09-05-2016
08:10 PM
The Databricks CSV library skips using Core Spark. The map function in Pyspark is run through a Python subprocess on each executor. When using Spark SQL with Databricks CSV library, everything goes through the catalyst optimizer and the output is java byte code. Scala/Java is about 40% faster than Python when using core Spark. I would guess that is the reason the 2nd implementation is much faster. The CSV library probably is much more efficient at breaking up the records, probably applying the split partition by partition as opposed to record by record.
... View more
08-01-2016
02:15 AM
1 Kudo
If I had to guess your using Spark 1.5.2 or earlier. What is happening is you run out of memory. I think youre running out of executor memory, so you're probably doing a map-side aggregate. How many keys do you have? I think we can fix this pretty simply. Are you caching data? If not set spark.suffle.fraction to a number higher than .4.
... View more
08-01-2016
02:08 AM
If you call groupByKey on a dataframe it implicitly converts the dataframe to an rdd. You lose all benefits of the optimizer for this step.
... View more
07-31-2016
09:29 PM
Agreed 100%. If you can accomplish the same task using reduceByKey, it implements a combiner, so its basically does the aggregate locally, then shuffles the results for each partition. Just keep an eye on GC when doing this.
... View more
07-31-2016
09:27 PM
Use the explain API, you should see something like BroadcastRDD if a broadcast join is happening. Also, make sure you've enabled code generation for spark sql "spark.sql.codegen=true" . Older versions of spark (1.4 and earlier) have it set to false.
... View more
07-31-2016
09:19 PM
3 Kudos
When creating a Kafka receiver, its one receiver per topic partition. You can definitely repartition the data after receiving it from Kafka. This should distribute the data to all of your workers as opposed to only having 5 executors do the work.
... View more
07-31-2016
09:17 PM
2 Kudos
Try setting your join to a broadcast join. By default, the Spark SQL does a broadcast join for tables less than 10mb. I think in this case, it would make a lot of sense to changing the setting "spark.sql.autoBroadCastJoinThreshold" to 250mb. This will do a map side join in terms of mapreduce, and should be much quicker than what you're experiencing. Also, don't worry about having a large amount of tasks. Its very ok to have that many tasks. I've found that any more than 4 cores per executor, you have diminishing returns on performance (IE 4 cores = 400% throughput, 5 cores is ~430% throughput). Another setting you might want to investigate is the spark.sql.shuffle.partitions setting, it is the number of partitions to use when shuffling data for joins, and by default is 200. I think you might want to up that number quite a bit.
... View more