Created 07-05-2017 02:38 PM
I'm still trying to get the Zepellin's tutorial working.
With a few tweaks I was able to get the stream working but I can't make queries...
Here's the code:
import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.twitter._ import org.apache.spark.storage.StorageLevel import scala.io.Source import scala.collection.mutable.HashMap import java.io.File import org.apache.log4j.Logger import org.apache.log4j.Level import sys.process.stringSeqToProcess import org.apache.spark.sql.DataFrame import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.streaming.StreamingContext._ /** Configures the Oauth Credentials for accessing Twitter */ def configureTwitterCredentials(apiKey: String, apiSecret: String, accessToken: String, accessTokenSecret: String) { val configs = new HashMap[String, String] ++= Seq( "apiKey" -> apiKey, "apiSecret" -> apiSecret, "accessToken" -> accessToken, "accessTokenSecret" -> accessTokenSecret) println("Configuring Twitter OAuth") configs.foreach{ case(key, value) => if (value.trim.isEmpty) { throw new Exception("Error setting authentication - value for " + key + " not set") } val fullKey = "twitter4j.oauth." + key.replace("api", "consumer") System.setProperty(fullKey, value.trim) println("\tProperty " + fullKey + " set as [" + value.trim + "]") } println() } // Configure Twitter credentials val apiKey = "wsb68B0Xv7JMm9y0GI8UWVv9M" val apiSecret = "LxNt80GDB0noKzJOrgjJfldgZvJHTRPox8hpeidkEzD7jbJ5wk" val accessToken = "361298122-W2KrbTiAtmtydVhbNeAEHvIHy1VbMcTgitbRrQcW" val accessTokenSecret = "n6hjL3CXONlxHT37lbQ2PnjCfGiIMlSxxNiNbgIWHSaA9" configureTwitterCredentials(apiKey, apiSecret, accessToken, accessTokenSecret) import org.apache.spark.streaming.twitter._ val sc = new SparkConf().setMaster("local[2]").setAppName("tweets").set("spark.cleaner.ttl","2000") val ssc = new StreamingContext(sc, Seconds(2)) val tweets = TwitterUtils.createStream(ssc, None) val twt = tweets.window(Seconds(60)) case class Tweet(createdAt:Long, text:String) twt.map(status=> Tweet(status.getCreatedAt().getTime()/1000, status.getText()) ).foreachRDD { rdd=> // Below line works only in spark 1.3.0. // For spark 1.1.x and spark 1.2.x, // use rdd.registerTempTable("tweets") instead. //rdd.toDF().registerAsTable("tweets") //val sqlContext = new org.apache.spark.sql.SQLContext(sc) val sqlContext = SQLContext.getOrCreate(rdd.sparkContext) import sqlContext._ import sqlContext.implicits._ rdd.toDF.registerTempTable("tweets") } twt.print ssc.start()
And the error I'm getting:
org.apache.spark.SparkException: Failed to get broadcast_2_piece0 of broadcast_2 at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:138) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:138) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:137) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:120) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:120) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:120) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:175) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1219) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88) at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748)
Can anyone help?
Many thanks!
Created 07-05-2017 03:11 PM
It appears you are using Spark version lower than 2.1.
In your code you have the following line:
val sc =newSparkConf().setMaster("local[2]").setAppName("tweets").set("spark.cleaner.ttl","2000")
spark.cleaner.ttl basically triggers a cleanup after the time ( "2000") that you specify.
From the official Spark 1.6 documentation :
spark.cleaner.ttl - Duration (seconds) of how long Spark will remember any metadata (stages generated, tasks generated, etc.). Periodic cleanups will ensure that metadata older than this duration will be forgotten. This is useful for running Spark for many hours / days (for example, running 24/7 in case of Spark Streaming applications). Note that any RDD that persists in memory for more than this duration will be cleared as well.
Default is infinite.
In your case, it is quite possible that the cleanup is being triggered even before your job finishes.
Increase the value and try again.
Refer this JIRA for an existing discussion to get more insight.
Created 07-05-2017 03:47 PM
First of all, many thanks for your answer.
Yes I've added that value myself since that with no value (infinite) I got the same error and I've tried to give it a value just to check... but the problem persists.
My spark version is 1.6.x.2.4 and the tutorial is this one:
https://zeppelin.apache.org/docs/0.5.5-incubating/tutorial/tutorial.html
Many thanks once more.
Best regards
Created 07-05-2017 03:49 PM
Alright, I will try to go through this use case and try to replicate this issue.
Created 07-05-2017 08:24 PM
Thank you for sharing the tutorial. I was able to replicate the issue and I found the issue to be with incompatible jars.
I am using the following precise versions that I pass to spark-shell.
spark-streaming-twitter_2.10-1.6.1.2.4.2.10-1.jar
For test purpose, I had put them all under /tmp and here is how I initiated the spark shell:
Syntax: spark-shell --jars "/path/jar1,/path/jar2,/path/jar3" Example: spark-shell --jars "/tmp/spark-streaming-twitter_2.10-1.6.1.2.4.2.10-1.jar,/tmp/twitter4j-core-4.0.4.jar,/tmp/twitter4j-stream-4.0.4.jar"
Once I got the scala prompt in spark-shell, I typed out the code from the tutorial without specifying any value for "spark.cleaner.ttl".
Hope this helps.
Created 07-06-2017 09:17 AM
Many thanks once more.
I'm not using spark-shell... I'm using the Zeppelin view and loading the jars with z.load.
However the spark-streaming jar is not on central (is hortonworks) and therefore I can't add it with z.load ...
Tried with the old jar that I had 2_10:1.4.1 and I'm getting the same error on Zeppelin 0.6 (Ambari) when I run the first sql comand of the tutorial:
%sql select * from tweets where text like '%girl%' limit 10
Can it be from the jar? How I add it on Zeppelin since it's not on Central?
Many thanks once more.