Created 07-10-2018 12:34 PM
I wrote a small Zeppelin paragraph that tries around with caching (reading Hive tables). Here my code:
%spark // Caching test #4: df.cache with 10 partitions val input1 = sqlContext.sql("SELECT * FROM db.table").repartition(10) val input2 = input1.cache() input2.count()
The first time I run this paragraph it takes about 10 minutes to finish it. When I run this paragraph some more times, it always needs around 0,5 and 1 second.
I added another paragraph to "initialize" the Spark interpreter after restarting it (I want to avoid these different run-times for the paragraph above).
%spark sc.version sqlContext.sql("select * from db.table limit 1").show()
Also this paragraph needs some more time in the first iteration, and less in the following.
BUT: The main paragraph (with the caching test) still takes so long in the first run!
Am I doing something wrong here? Is Zeppelin re-using the cached elements in the further iterations (even if I "overwrite" the objects by reading the whole table again before calling the cache method)?
Is there a difference between the two transformations df.cache() and sqlContext.cacheTable(name)?
Thanks for your help!
Created 07-10-2018 12:54 PM
First time you launch any action in spark interpreter a spark application is launched in yarn cluster. If you are concerned about the timings you could check:
1. Spark interpreter log under /var/log/zeppelin/zeppelin-spark*
2. The Spark UI is another great place to see the actual jobs submitted by driver thru task manager. To get there you go to the RM UI, locate the zepplin spark application and click on Application Master link.
Between these 2 hopefully you will have more information as to why is taking this long. Perhaps adding more memory or number of executors could help reduce the times, but you can consider these things after you have checked above and have a better understanding of what is happening.
HTH
*** If you found this answer addressed your question, please take a moment to login and click the "accept" link on the answer.
Created 07-10-2018 12:54 PM
First time you launch any action in spark interpreter a spark application is launched in yarn cluster. If you are concerned about the timings you could check:
1. Spark interpreter log under /var/log/zeppelin/zeppelin-spark*
2. The Spark UI is another great place to see the actual jobs submitted by driver thru task manager. To get there you go to the RM UI, locate the zepplin spark application and click on Application Master link.
Between these 2 hopefully you will have more information as to why is taking this long. Perhaps adding more memory or number of executors could help reduce the times, but you can consider these things after you have checked above and have a better understanding of what is happening.
HTH
*** If you found this answer addressed your question, please take a moment to login and click the "accept" link on the answer.
Created 07-10-2018 01:04 PM
Thank you for the fast answer. The problem seems to be the cache method, as this takes always long for the first iteration and is faster in all the following iterations. This behavior is independent of which paragraphs I run before... It's clear to me, that the first action launches the application (and therefore starts the Executors etc.). But it's not clear to me, why Spark "knows" that it can re-use the cached dataframe from the iterations before, because I overwrite the variables (or even use some others).
Created 07-10-2018 01:19 PM
@Daniel Müller Zeppelin is going to reuse the same spark application already lunched. So driver and executors will be the same ones you used to execute for first time. If you noticed, between paragraphs, if you reuse a variable/value declared/initialized before zeppelin is not going to be re-computed again because the same is already in memory.
The best place to check is the spark ui as it will tell you exactly if any actual job is being lunched, if you don't see any jobs lunched you will know is using it from cache/memory.
HTH
Created 07-10-2018 01:54 PM
Yes, I'm familiar with Spark. What I wondered about was the caching behavior. It really seems to know which HiveQL statement belongs to the cached data, and re-uses it automatically when the same query comes:
// Cache the table for the first time => takes some time! val df1_1 = sqlContext.sql("SELECT a, b FROM db.table limit 1000000") val df1_2 = df1_1.cache() df1_2.count() // This re-uses the cached object, as the request is the same as before => very fast! val df2_1 = sqlContext.sql("SELECT a, b FROM db.table limit 1000000") val df2_2= df2_1.cache() df2_2.count() // This caches the data, because the request is different (another limit clause) => takes some time! val df3_1 = sqlContext.sql("SELECT a, b FROM db.table limit 10") val df3_2= df3_1.cache() df3_2.count()
Thanks for your help @Felix Albani