Created 06-30-2017 03:25 PM
I'm trying to run the following twitter tutorial:
import org.apache.spark.streaming._ import org.apache.spark.streaming.twitter._ import org.apache.spark.storage.StorageLevel import scala.io.Source import scala.collection.mutable.HashMap import java.io.File import org.apache.log4j.Logger import org.apache.log4j.Level import sys.process.stringSeqToProcess /** Configures the Oauth Credentials for accessing Twitter */ def configureTwitterCredentials(apiKey: String, apiSecret: String, accessToken: String, accessTokenSecret: String) { val configs = new HashMap[String, String] ++= Seq( "apiKey" -> apiKey, "apiSecret" -> apiSecret, "accessToken" -> accessToken, "accessTokenSecret" -> accessTokenSecret) println("Configuring Twitter OAuth") configs.foreach{ case(key, value) => if (value.trim.isEmpty) { throw new Exception("Error setting authentication - value for " + key + " not set") } val fullKey = "twitter4j.oauth." + key.replace("api", "consumer") System.setProperty(fullKey, value.trim) println("\tProperty " + fullKey + " set as [" + value.trim + "]") } println() } // Configure Twitter credentials val apiKey = "xxxxxxxxxxxxxxxxxxxxxxxxx" val apiSecret = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" val accessToken = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" val accessTokenSecret = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" configureTwitterCredentials(apiKey, apiSecret, accessToken, accessTokenSecret) import org.apache.spark.streaming.twitter._ val ssc = new StreamingContext(sc, Seconds(2)) val tweets = TwitterUtils.createStream(ssc, None) val twt = tweets.window(Seconds(60)) case class Tweet(createdAt:Long, text:String) twt.map(status=> Tweet(status.getCreatedAt().getTime()/1000, status.getText()) ).foreachRDD(rdd=> // Below line works only in spark 1.3.0. // For spark 1.1.x and spark 1.2.x, // use rdd.registerTempTable("tweets") instead. rdd.toDF().registerAsTable("tweets") ) twt.print ssc.start()
However I get the following error:
<console>:504: error: value toDF is not a member of org.apache.spark.rdd.RDD[Tweet] rdd.toDF().registerAsTable("tweets")
I have Spark 1.6.x.2.4 and Zeppelin 0.6
Created 07-05-2017 02:33 PM
Now it's working...
import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.twitter._ import org.apache.spark.storage.StorageLevel import scala.io.Source import scala.collection.mutable.HashMap import java.io.File import org.apache.log4j.Logger import org.apache.log4j.Level import sys.process.stringSeqToProcess import org.apache.spark.sql.DataFrame import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.streaming.StreamingContext._ /** Configures the Oauth Credentials for accessing Twitter */ def configureTwitterCredentials(apiKey: String, apiSecret: String, accessToken: String, accessTokenSecret: String) { val configs = new HashMap[String, String] ++= Seq( "apiKey" -> apiKey, "apiSecret" -> apiSecret, "accessToken" -> accessToken, "accessTokenSecret" -> accessTokenSecret) println("Configuring Twitter OAuth") configs.foreach{ case(key, value) => if (value.trim.isEmpty) { throw new Exception("Error setting authentication - value for " + key + " not set") } val fullKey = "twitter4j.oauth." + key.replace("api", "consumer") System.setProperty(fullKey, value.trim) println("\tProperty " + fullKey + " set as [" + value.trim + "]") } println() } // Configure Twitter credentials val apiKey = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" val apiSecret = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" val accessToken = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" val accessTokenSecret = "XXXXXXXXXXXXXXXXXXXXXXX" configureTwitterCredentials(apiKey, apiSecret, accessToken, accessTokenSecret) import org.apache.spark.streaming.twitter._ val sc = new SparkConf().setMaster("local[2]").setAppName("tweets").set("spark.cleaner.ttl","2000") val ssc = new StreamingContext(sc, Seconds(2)) val tweets = TwitterUtils.createStream(ssc, None) val twt = tweets.window(Seconds(60)) case class Tweet(createdAt:Long, text:String) twt.map(status=> Tweet(status.getCreatedAt().getTime()/1000, status.getText()) ).foreachRDD { rdd=> // Below line works only in spark 1.3.0. // For spark 1.1.x and spark 1.2.x, // use rdd.registerTempTable("tweets") instead. //rdd.toDF().registerAsTable("tweets") //val sqlContext = new org.apache.spark.sql.SQLContext(sc) val sqlContext = SQLContext.getOrCreate(rdd.sparkContext) import sqlContext._ import sqlContext.implicits._ rdd.toDF.registerTempTable("tweets") } twt.print ssc.start()
Created 07-05-2017 02:33 PM
Now it's working...
import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.twitter._ import org.apache.spark.storage.StorageLevel import scala.io.Source import scala.collection.mutable.HashMap import java.io.File import org.apache.log4j.Logger import org.apache.log4j.Level import sys.process.stringSeqToProcess import org.apache.spark.sql.DataFrame import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.streaming.StreamingContext._ /** Configures the Oauth Credentials for accessing Twitter */ def configureTwitterCredentials(apiKey: String, apiSecret: String, accessToken: String, accessTokenSecret: String) { val configs = new HashMap[String, String] ++= Seq( "apiKey" -> apiKey, "apiSecret" -> apiSecret, "accessToken" -> accessToken, "accessTokenSecret" -> accessTokenSecret) println("Configuring Twitter OAuth") configs.foreach{ case(key, value) => if (value.trim.isEmpty) { throw new Exception("Error setting authentication - value for " + key + " not set") } val fullKey = "twitter4j.oauth." + key.replace("api", "consumer") System.setProperty(fullKey, value.trim) println("\tProperty " + fullKey + " set as [" + value.trim + "]") } println() } // Configure Twitter credentials val apiKey = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" val apiSecret = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" val accessToken = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" val accessTokenSecret = "XXXXXXXXXXXXXXXXXXXXXXX" configureTwitterCredentials(apiKey, apiSecret, accessToken, accessTokenSecret) import org.apache.spark.streaming.twitter._ val sc = new SparkConf().setMaster("local[2]").setAppName("tweets").set("spark.cleaner.ttl","2000") val ssc = new StreamingContext(sc, Seconds(2)) val tweets = TwitterUtils.createStream(ssc, None) val twt = tweets.window(Seconds(60)) case class Tweet(createdAt:Long, text:String) twt.map(status=> Tweet(status.getCreatedAt().getTime()/1000, status.getText()) ).foreachRDD { rdd=> // Below line works only in spark 1.3.0. // For spark 1.1.x and spark 1.2.x, // use rdd.registerTempTable("tweets") instead. //rdd.toDF().registerAsTable("tweets") //val sqlContext = new org.apache.spark.sql.SQLContext(sc) val sqlContext = SQLContext.getOrCreate(rdd.sparkContext) import sqlContext._ import sqlContext.implicits._ rdd.toDF.registerTempTable("tweets") } twt.print ssc.start()