Member since
06-29-2016
30
Posts
3
Kudos Received
0
Solutions
01-18-2020
02:50 AM
Spark Catalyst Optimiser is smart.If it not optimising well then you have to think about it else it is able to optimise. Below is one example: fr = spark.createDataframe([{'a':1},{'b':2}]) fr.select('a','b').drop('a') parsed logical plan for above query is below Parsed Logical Plan == Project [b#69L] +- Project [a#68L, b#69L] +- LogicalRDD [a#68L, b#69L], false And Physical plan is below Physical Plan == *(1) Project [b#69L] +- *(1) Scan ExistingRDD[a#68L,b#69L] Spark is optimising the query from two projection to single projection Which is same as Physical plan of fr.select('a').
... View more
08-08-2016
09:15 PM
use kyro when working with RDD's. prob won't help with DatFrames. I never used kyro with DataFrames. maybe you can test and post your results
... View more
08-01-2016
06:23 PM
I should have read the post a little closer I thought you were doing a groupByKey. You are correct, you need to use groupBy to keep the execution within the dataframe and out of Python. However, you said you are doing an outer join. If it is a left join and the right side is larger than the left, then do an inner join first. Then do your left join on the result. Your result most likely will be broadcasted to do the left join. This is a pattern that Holden described at Strata this year in one of her sessions.
... View more
07-26-2016
03:21 PM
Some thoughts/questions: What does the key distribution look like? If lumpy, perhaps a repartition would help? Looking at the Spark UI might give some insight into the bottleneck. Bumping up spark.sql.autoBroadcastJoinThreshold to 300M might help ensure that the map-side join (broadcast join) happens. Check here though because it notes "...that currently statistics are only supported for Hive Metastore tables where the command ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan has been run." Other answers:
Right now I'm running Spark 1.6.0. Would moving to Spark 2.0 DataSet (or even DataFrames) be much better? Doubt it. There's probably opportunities to tune with what you have that would be needed nonetheless in 2.0. What if I used RDDs instead? I know that reduceByKey is better than groupByKey, and DataFrames don't have that method. If you want to post more of your code, we can comment on that. Hard to tell if the RDD API's more granular control would help you without the bigger picture. I think I can do a broadcast join and have set a threshold. Do I need to set it above my second DataFrame size? Do I need to explicitly call broadcast(df2)? Yes, the threshold matters and should be above the data size. Think of this like a map-side join. No, you should not need to call broadcast explicitly. However, if you did you cannot broadcast the dataframe itself. It would have to be a collection loaded up in the driver. Check here for info on broadcast variables: https://spark.apache.org/docs/0.8.1/scala-programming-guide.html#broadcast-variables What's the point of driver memory? When performing something like "collect" it will bring results back to the driver. If you're collecting a lot of results, you'll need to worry about that driver-memory setting. Can anyone point out something wrong with my tuning numbers, or any additional parameters worth checking out? Looks good, but we could give more assistance if we have the full code. Also, look at the Spark UI and walk the DAG to see where the bottleneck is.
... View more
07-22-2016
09:11 PM
2 Kudos
Could you filter the empty string before the group? df.where(df("number") !== "").groupBy($"id").agg(concat_ws(DELIM, collect_list($"num")))
... View more
05-28-2018
08:58 AM
Hello people ! Could anyone help me out! my datset contains a timestamp field and I need to extract the year, the month, the day and the hour from it. I taped these lines ! training.createOrReplaceTempView("df")
spark.udf.register("getCurrentHour", getCurrentHour _)
val hour = spark.sql("select getCurrentHour(payload_MeterReading_IntervalBlock_IReading_endTime) as hour from df")
spark.udf.register("assignTod", assignTod _)
timestamps_df.createOrReplaceTempView("timestamps")
val tod = spark.sql("select assignTod(hour) as tod from timestamps")
the problem is am not good in scala so I couldn't figure out the best solution ! the two fonctions i used to extract hour and assign it as day part def assignToDay(hr : Integer) : String = {
if(hr >= 7 && hr < 12){
return "morning"
}else if ( hr >= 12 && hr < 14) {
return "lunch"
} else if ( hr >= 14 && hr < 18) {
return "afternoon"
} else if ( hr >= 18 && hr.<(23)) {
return "evening"
} else if ( hr >= 23 && hr <= 24) {
return "night"
} else if ( hr < 7) {
return "night"
} else {
return "error"
}
}
def getCurrentHour(dateStr: String) : Integer = {
var currentHour = 0
try {
val date = new Date(dateStr.toLong)
return int2Integer(date.getHours)
} catch {
case _ => return currentHour
}
return 1
}
... View more
07-05-2016
09:51 PM
Seeing as how I will be working with gigabytes/terabytes of data, I think a native datatype would be best then. Thank you!
... View more
07-04-2016
05:03 AM
1 Kudo
Yes. A projection before any sort of transformation/action would help in computation time and storage optimization.
... View more
06-29-2016
08:25 PM
1 Kudo
The methods you mention will not alter sort order for a join operation, since data is always shuffled for join. For ways to enforce sort order, you can read this post on HCC: https://community.hortonworks.com/questions/42464/spark-dataframes-how-can-i-change-the-order-of-col.html To answer your questions about coalesce() and repartition(), these are both used to modify the # of partitions stored by the RDD. The repartition() method can increase or decrease the # of partitions, and allows shuffles across nodes, meaning data stored on one node can be moved to another. This makes it inefficient for large rdds. The coalesce() method can only be used to decrease the # of partitions, and shuffles are not allowed. This makes it more efficient than repartition, but it may result in asymmetric partitions since no data is moved across nodes.
... View more
07-15-2016
04:39 PM
1 Kudo
All you need to do is use select (worked for me). Do the following: val new_df = df.select("a", "b", "c", "d", "e") // Assuming you want a, b, c, d, e to be your order @venki2404 , All you need to do do is use select (worked for me). Do the following: val new_df = df.select("a", "b", "c", "d", "e") // assuming the column order you need is a, b, c, d, e
... View more