Created 07-20-2016 10:08 PM
I know that DataFrame methods that return a DataFrame are lazy. But what about groupBy(...).agg(...) which has a GroupedData object involved, or join()?
I also have read that you should cache RDDs/DataFrames when you're repeatedly referencing the DF in separate, multiple branches. What about when applying a lot of transformations and actions in a linear fashion?
Created 07-20-2016 10:33 PM
Hello @jestin ma
My answer would be, I don't think the DataFrame groupBy().agg(...) methods trigger actions by themselves, although many times they are followed by an action. DataFrames are a subclass of RDDs (low-level API) and the RDD actions are listed at the link below, and include count, collect, take, etc.
http://spark.apache.org/docs/latest/programming-guide.html#actions
In fact, the cache() method is not an action either; think of it as a stored instruction to "keep resident" the results once they are materialized by an action. When applying a long list of transformations and actions (very common), the cache method can indicate "please keep this DataFrame/RDD cached in memory (StorageLevel 1)" as long as possible. Other DataFrames which are not cached can be shuffled to disk, depending on available memory and settings.
Created 07-21-2016 04:04 AM
What if I had some DataFrame df and did something like df.transform().transform().....transform().action()?
Would I need to cache that if I had a linear progression?
Created 08-01-2016 07:42 PM
Hello @jestin ma No, you do not have to call cache() method in order to use a long list of transformations followed by an action. One of the nice things about DataFrames (and the underlying RDDs) is that they are "resilient", meaning that they store the actions necessary to rebuild themselves, and will do so if needed. So even if the dataframe is "paged out" of memory, or otherwise not available, any action taken against it will trigger all of the actions and transformations necessary to rebuilt it. The cache() method is like promoting the "score" or priority of the DataFrame, instructing Spark to keep it resident so that future actions will execute more quickly.
Created 07-21-2016 07:26 AM
Hi @jestin ma.
When you are using DataFrames in Spark, there are two types of operations: transformations and actions.
Transformations are lazy and are executed when actions runs on it. As far as my understanding goes, pretty much all column and row operations you have (including select, groupBy, agg, drop, etc) are evaluated lazily and only executed when you use actions such as show, take, collect, count, describe and others.
In regards to caching, it is advisable to persist DataFrames in memory when you intend to use it multiple times. It doesn't matter if you call transformations and action in the same line or if you have those separated over multiple lines.
Created 07-21-2016 12:49 PM
Hi ymoiseev. Thank you for your answer regarding transformations.
Regarding caching, so if I have a DataFrame df and I have something like
df.groupBy(...).agg(...).select(...).drop(...).withColumn(...).groupBy(...).join(...).select(...)
And then I call an action afterwards, I should cache it?
Created 07-21-2016 01:59 PM
If you intend to re-use this DataFrame later on, the short answer is yes. The longs answer is depends:
Caching by itself is also lazy operation. When you call it the file is still not read. The difference is, now you tell Spark to perform those operations and cache the contents. So once you run some action on it, it will be loaded and cached -- when you run same action for a second time, it will just take the data from cache.
Hope it makes it clearer, but let me know if you need more clarification.
Created 07-22-2016 09:23 PM
Hi, that makes it a bit clearer. I just want to clarify the opposite case, where I have
df.groupBy().agg().select().drop().join()
Then I perform one action and I'm done. In this case, I wouldn't need to cache, is that correct?
Created 07-26-2016 06:04 AM
Sorry for late reply.
Not sure how this case is opposite, but once again - it will depend on whether you are planning to use result of those transformations later. For example, I would use caching in this case:
df1 = df.groupBy(..).select(..) ... # Other transformations df1.cache() # You can treat it as if we created checkpoint here df2 = df1.filter(df1.col > 10).count() # df1 would be created here df3 = df1.filter(df2.col < 5).count() # df1 would be re-used at this point
BTW, I found this stack overflow discussion to be useful for understanding when to use caching
http://stackoverflow.com/questions/28981359/why-do-we-need-to-call-cache-or-persist-on-a-rdd