Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Spark error: org.apache.spark.SparkException: Failed to get broadcast_2_piece0 of broadcast_2

avatar
Rising Star

I'm still trying to get the Zepellin's tutorial working.

With a few tweaks I was able to get the stream working but I can't make queries...

Here's the code:

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.twitter._
import org.apache.spark.storage.StorageLevel
import scala.io.Source
import scala.collection.mutable.HashMap
import java.io.File
import org.apache.log4j.Logger
import org.apache.log4j.Level
import sys.process.stringSeqToProcess
import org.apache.spark.sql.DataFrame
import org.apache.spark.SparkConf 
import org.apache.spark.rdd.RDD 
import org.apache.spark.sql._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.StreamingContext._

/** Configures the Oauth Credentials for accessing Twitter */
def configureTwitterCredentials(apiKey: String, apiSecret: String, accessToken: String, accessTokenSecret: String) {
  val configs = new HashMap[String, String] ++= Seq(
    "apiKey" -> apiKey, "apiSecret" -> apiSecret, "accessToken" -> accessToken, "accessTokenSecret" -> accessTokenSecret)
  println("Configuring Twitter OAuth")
  configs.foreach{ case(key, value) =>
    if (value.trim.isEmpty) {
      throw new Exception("Error setting authentication - value for " + key + " not set")
    }
    val fullKey = "twitter4j.oauth." + key.replace("api", "consumer")
    System.setProperty(fullKey, value.trim)
    println("\tProperty " + fullKey + " set as [" + value.trim + "]")
  }
  println()
}

// Configure Twitter credentials
val apiKey = "wsb68B0Xv7JMm9y0GI8UWVv9M"
val apiSecret = "LxNt80GDB0noKzJOrgjJfldgZvJHTRPox8hpeidkEzD7jbJ5wk"
val accessToken = "361298122-W2KrbTiAtmtydVhbNeAEHvIHy1VbMcTgitbRrQcW"
val accessTokenSecret = "n6hjL3CXONlxHT37lbQ2PnjCfGiIMlSxxNiNbgIWHSaA9"
configureTwitterCredentials(apiKey, apiSecret, accessToken, accessTokenSecret)

import org.apache.spark.streaming.twitter._
val sc = new SparkConf().setMaster("local[2]").setAppName("tweets").set("spark.cleaner.ttl","2000")
val ssc = new StreamingContext(sc, Seconds(2))
val tweets = TwitterUtils.createStream(ssc, None)
val twt = tweets.window(Seconds(60))

case class Tweet(createdAt:Long, text:String)
twt.map(status=>
  Tweet(status.getCreatedAt().getTime()/1000, status.getText())
).foreachRDD { rdd=>
  // Below line works only in spark 1.3.0.
  // For spark 1.1.x and spark 1.2.x,
  // use rdd.registerTempTable("tweets") instead.
  //rdd.toDF().registerAsTable("tweets")
  
  //val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
    import sqlContext._
    import sqlContext.implicits._
  
  rdd.toDF.registerTempTable("tweets")
}

twt.print

ssc.start()

And the error I'm getting:

org.apache.spark.SparkException: Failed to get broadcast_2_piece0 of broadcast_2
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:138)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:138)
	at scala.Option.getOrElse(Option.scala:120)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:137)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:120)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:120)
	at scala.collection.immutable.List.foreach(List.scala:318)
	at org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:120)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:175)
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1219)
	at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165)
	at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
	at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
	at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88)
	at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
	at org.apache.spark.scheduler.Task.run(Task.scala:89)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)

Can anyone help?

Many thanks!

5 REPLIES 5

avatar

@Hugo Felix

It appears you are using Spark version lower than 2.1.

In your code you have the following line:

val sc =newSparkConf().setMaster("local[2]").setAppName("tweets").set("spark.cleaner.ttl","2000")

spark.cleaner.ttl basically triggers a cleanup after the time ( "2000") that you specify.

From the official Spark 1.6 documentation :

spark.cleaner.ttl - Duration (seconds) of how long Spark will remember any metadata (stages generated, tasks generated, etc.). Periodic cleanups will ensure that metadata older than this duration will be forgotten. This is useful for running Spark for many hours / days (for example, running 24/7 in case of Spark Streaming applications). Note that any RDD that persists in memory for more than this duration will be cleared as well.

Default is infinite.

In your case, it is quite possible that the cleanup is being triggered even before your job finishes.

Increase the value and try again.

Refer this JIRA for an existing discussion to get more insight.

avatar
Rising Star

@Dinesh Chitlangia

First of all, many thanks for your answer.

Yes I've added that value myself since that with no value (infinite) I got the same error and I've tried to give it a value just to check... but the problem persists.

My spark version is 1.6.x.2.4 and the tutorial is this one:

https://zeppelin.apache.org/docs/0.5.5-incubating/tutorial/tutorial.html

Many thanks once more.

Best regards

avatar

Alright, I will try to go through this use case and try to replicate this issue.

avatar

@Hugo Felix

Thank you for sharing the tutorial. I was able to replicate the issue and I found the issue to be with incompatible jars.

I am using the following precise versions that I pass to spark-shell.

spark-streaming-twitter_2.10-1.6.1.2.4.2.10-1.jar

twitter4j-core-4.0.4.jar

twitter4j-stream-4.0.4.jar

For test purpose, I had put them all under /tmp and here is how I initiated the spark shell:

Syntax:

spark-shell --jars "/path/jar1,/path/jar2,/path/jar3"

Example:

spark-shell --jars "/tmp/spark-streaming-twitter_2.10-1.6.1.2.4.2.10-1.jar,/tmp/twitter4j-core-4.0.4.jar,/tmp/twitter4j-stream-4.0.4.jar"

Once I got the scala prompt in spark-shell, I typed out the code from the tutorial without specifying any value for "spark.cleaner.ttl".

Hope this helps.

avatar
Rising Star

@Dinesh Chitlangia

Many thanks once more.

I'm not using spark-shell... I'm using the Zeppelin view and loading the jars with z.load.

However the spark-streaming jar is not on central (is hortonworks) and therefore I can't add it with z.load ...

Tried with the old jar that I had 2_10:1.4.1 and I'm getting the same error on Zeppelin 0.6 (Ambari) when I run the first sql comand of the tutorial:

%sql select * from tweets where text like '%girl%' limit 10

Can it be from the jar? How I add it on Zeppelin since it's not on Central?

Many thanks once more.