Archives of Support Questions (Read Only)

This is an archived board for historical reference. Information and links may no longer be available or relevant
Announcements
This board is archived and read-only for historical reference. To ask a new question, please post a new topic on the appropriate active board.

Zeppelin Tutorial : error: value toDF is not a member of org.apache.spark.rdd.RDD[Tweet]

avatar
Rising Star

I'm trying to run the following twitter tutorial:

import org.apache.spark.streaming._
import org.apache.spark.streaming.twitter._
import org.apache.spark.storage.StorageLevel
import scala.io.Source
import scala.collection.mutable.HashMap
import java.io.File
import org.apache.log4j.Logger
import org.apache.log4j.Level
import sys.process.stringSeqToProcess

/** Configures the Oauth Credentials for accessing Twitter */
def configureTwitterCredentials(apiKey: String, apiSecret: String, accessToken: String, accessTokenSecret: String) {
  val configs = new HashMap[String, String] ++= Seq(
    "apiKey" -> apiKey, "apiSecret" -> apiSecret, "accessToken" -> accessToken, "accessTokenSecret" -> accessTokenSecret)
  println("Configuring Twitter OAuth")
  configs.foreach{ case(key, value) =>
    if (value.trim.isEmpty) {
      throw new Exception("Error setting authentication - value for " + key + " not set")
    }
    val fullKey = "twitter4j.oauth." + key.replace("api", "consumer")
    System.setProperty(fullKey, value.trim)
    println("\tProperty " + fullKey + " set as [" + value.trim + "]")
  }
  println()
}

// Configure Twitter credentials
val apiKey = "xxxxxxxxxxxxxxxxxxxxxxxxx"
val apiSecret = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
val accessToken = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
val accessTokenSecret = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
configureTwitterCredentials(apiKey, apiSecret, accessToken, accessTokenSecret)

import org.apache.spark.streaming.twitter._
val ssc = new StreamingContext(sc, Seconds(2))
val tweets = TwitterUtils.createStream(ssc, None)
val twt = tweets.window(Seconds(60))

case class Tweet(createdAt:Long, text:String)
twt.map(status=>
  Tweet(status.getCreatedAt().getTime()/1000, status.getText())
).foreachRDD(rdd=>
  // Below line works only in spark 1.3.0.
  // For spark 1.1.x and spark 1.2.x,
  // use rdd.registerTempTable("tweets") instead.
  rdd.toDF().registerAsTable("tweets")
)

twt.print

ssc.start()

However I get the following error:

<console>:504: error: value toDF is not a member of org.apache.spark.rdd.RDD[Tweet]
                rdd.toDF().registerAsTable("tweets")

I have Spark 1.6.x.2.4 and Zeppelin 0.6

1 ACCEPTED SOLUTION

avatar
Rising Star

Now it's working...

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.twitter._
import org.apache.spark.storage.StorageLevel
import scala.io.Source
import scala.collection.mutable.HashMap
import java.io.File
import org.apache.log4j.Logger
import org.apache.log4j.Level
import sys.process.stringSeqToProcess
import org.apache.spark.sql.DataFrame
import org.apache.spark.SparkConf 
import org.apache.spark.rdd.RDD 
import org.apache.spark.sql._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.StreamingContext._

/** Configures the Oauth Credentials for accessing Twitter */
def configureTwitterCredentials(apiKey: String, apiSecret: String, accessToken: String, accessTokenSecret: String) {
  val configs = new HashMap[String, String] ++= Seq(
    "apiKey" -> apiKey, "apiSecret" -> apiSecret, "accessToken" -> accessToken, "accessTokenSecret" -> accessTokenSecret)
  println("Configuring Twitter OAuth")
  configs.foreach{ case(key, value) =>
    if (value.trim.isEmpty) {
      throw new Exception("Error setting authentication - value for " + key + " not set")
    }
    val fullKey = "twitter4j.oauth." + key.replace("api", "consumer")
    System.setProperty(fullKey, value.trim)
    println("\tProperty " + fullKey + " set as [" + value.trim + "]")
  }

  println()
}

// Configure Twitter credentials
val apiKey = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
val apiSecret = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
val accessToken = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
val accessTokenSecret = "XXXXXXXXXXXXXXXXXXXXXXX"
configureTwitterCredentials(apiKey, apiSecret, accessToken, accessTokenSecret)

import org.apache.spark.streaming.twitter._
val sc = new SparkConf().setMaster("local[2]").setAppName("tweets").set("spark.cleaner.ttl","2000")
val ssc = new StreamingContext(sc, Seconds(2))
val tweets = TwitterUtils.createStream(ssc, None)
val twt = tweets.window(Seconds(60))

case class Tweet(createdAt:Long, text:String)
twt.map(status=>
  Tweet(status.getCreatedAt().getTime()/1000, status.getText())
).foreachRDD { rdd=>
  // Below line works only in spark 1.3.0.
  // For spark 1.1.x and spark 1.2.x,
  // use rdd.registerTempTable("tweets") instead.
  //rdd.toDF().registerAsTable("tweets")
  
  //val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
    import sqlContext._
    import sqlContext.implicits._
  
  rdd.toDF.registerTempTable("tweets")
}

twt.print

ssc.start()

View solution in original post

1 REPLY 1

avatar
Rising Star

Now it's working...

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.twitter._
import org.apache.spark.storage.StorageLevel
import scala.io.Source
import scala.collection.mutable.HashMap
import java.io.File
import org.apache.log4j.Logger
import org.apache.log4j.Level
import sys.process.stringSeqToProcess
import org.apache.spark.sql.DataFrame
import org.apache.spark.SparkConf 
import org.apache.spark.rdd.RDD 
import org.apache.spark.sql._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.StreamingContext._

/** Configures the Oauth Credentials for accessing Twitter */
def configureTwitterCredentials(apiKey: String, apiSecret: String, accessToken: String, accessTokenSecret: String) {
  val configs = new HashMap[String, String] ++= Seq(
    "apiKey" -> apiKey, "apiSecret" -> apiSecret, "accessToken" -> accessToken, "accessTokenSecret" -> accessTokenSecret)
  println("Configuring Twitter OAuth")
  configs.foreach{ case(key, value) =>
    if (value.trim.isEmpty) {
      throw new Exception("Error setting authentication - value for " + key + " not set")
    }
    val fullKey = "twitter4j.oauth." + key.replace("api", "consumer")
    System.setProperty(fullKey, value.trim)
    println("\tProperty " + fullKey + " set as [" + value.trim + "]")
  }

  println()
}

// Configure Twitter credentials
val apiKey = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
val apiSecret = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
val accessToken = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
val accessTokenSecret = "XXXXXXXXXXXXXXXXXXXXXXX"
configureTwitterCredentials(apiKey, apiSecret, accessToken, accessTokenSecret)

import org.apache.spark.streaming.twitter._
val sc = new SparkConf().setMaster("local[2]").setAppName("tweets").set("spark.cleaner.ttl","2000")
val ssc = new StreamingContext(sc, Seconds(2))
val tweets = TwitterUtils.createStream(ssc, None)
val twt = tweets.window(Seconds(60))

case class Tweet(createdAt:Long, text:String)
twt.map(status=>
  Tweet(status.getCreatedAt().getTime()/1000, status.getText())
).foreachRDD { rdd=>
  // Below line works only in spark 1.3.0.
  // For spark 1.1.x and spark 1.2.x,
  // use rdd.registerTempTable("tweets") instead.
  //rdd.toDF().registerAsTable("tweets")
  
  //val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
    import sqlContext._
    import sqlContext.implicits._
  
  rdd.toDF.registerTempTable("tweets")
}

twt.print

ssc.start()