Created 09-23-2016 10:02 PM
I am getting the following error during compilation, also below are the build.sbt file and the source code. :
error
[info] Done updating.
[info] Compiling 4 Scala sources to /root/weblogs/target/scala-2.11/classes...
[error] /root/weblogs/src/main/scala/LogSQL.scala:60: value createOrReplaceTempView is not a member of org.apache.spark.sql.DataFrame
[error] requestsDataFrame.createOrReplaceTempView("requests")
[error] ^
[error] one error found
[error] (compile:compile) Compilation failed
scala code
[root@hadoop1 scala]# more LogSQL.scala
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.SQLContext
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext
import java.util.regex.Pattern
import java.util.regex.Matcher
import Utilities._
/** Illustrates using SparkSQL with Spark Streaming, to issue queries on
* Apache log data extracted from a stream on port 9999.
*/
object LogSQL {
def main(args: Array[String]) {
// Create the context with a 1 second batch size
val ssc = new StreamingContext("local[*]", "LogSQL", Seconds(1))
setupLogging()
// Construct a regular expression (regex) to extract fields from raw Apache log lines
val pattern = apacheLogPattern()
// Create a socket stream to read log data published via netcat on port 9999 locally
val lines = ssc.socketTextStream("127.0.0.1", 9998, StorageLevel.MEMORY_AND_DISK_SER)
// Extract the (URL, status, user agent) we want from each log line
val requests = lines.map(x => {
val matcher:Matcher = pattern.matcher(x)
if (matcher.matches()) {
val request = matcher.group(5)
val requestFields = request.toString().split(" ")
val url = util.Try(requestFields(1)) getOrElse "[error]"
(url, matcher.group(6).toInt, matcher.group(9))
} else {
("error", 0, "error")
}
})
// Process each RDD from each batch as it comes in
requests.foreachRDD((rdd, time) => {
// So we'll demonstrate using SparkSQL in order to query each RDD
// using SQL queries.
// Get the singleton instance of SQLContext
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
import sqlContext.implicits._
// SparkSQL can automatically create DataFrames from Scala "case classes".
// We created the Record case class for this purpose.
// So we'll convert each RDD of tuple data into an RDD of "Record"
// objects, which in turn we can convert to a DataFrame using toDF()
val requestsDataFrame = rdd.map(w => Record(w._1, w._2, w._3)).toDF()
// Create a SQL table from this DataFrame
requestsDataFrame.createOrReplaceTempView("requests")
// Count up occurrences of each user agent in this RDD and print the results.
// The powerful thing is that you can do any SQL you want here!
// But remember it's only querying the data in this RDD, from this batch.
val wordCountsDataFrame =
sqlContext.sql("select agent, count(*) as total from requests group by agent")
println(s"========= $time =========")
wordCountsDataFrame.show()
// If you want to dump data into an external database instead, check out the
// org.apache.spark.sql.DataFrameWriter class! It can write dataframes via
// jdbc and many other formats! You can use the "append" save mode to keep
// adding data from each batch.
})
// Kick it off
ssc.checkpoint("C:/checkpoint/")
ssc.start()
ssc.awaitTermination()
}
}
/** Case class for converting RDD to DataFrame */
case class Record(url: String, status: Int, agent: String)
/** Lazily instantiated singleton instance of SQLContext
* (Straight from included examples in Spark) */
object SQLContextSingleton {
@transient private var instance: SQLContext = _
def getInstance(sparkContext: SparkContext): SQLContext = {
if (instance == null) {
instance = new SQLContext(sparkContext)
}
instance
}
}
build.sbt
[root@hadoop1 weblogs]# more build.sbt name := "weblogs" version := "1.0" scalaVersion := "2.11.6" resolvers ++= Seq( "Apache HBase" at "http://repository.apache.org/content/repositories/releases", "Typesafe repository" at "http://repo.typesafe.com/typesafe/releases/" ) libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "1.6.2", "org.apache.spark" %% "spark-streaming" % "1.6.2", "org.apache.spark" %% "spark-sql" % "1.6.2", "org.apache.spark" %% "spark-mllib" % "1.6.2" )
Created 09-29-2016 06:45 PM
I fixed this issue by upgrading from SPARK 1.6.2 to SPARK2.0. I actually upgraded my HDC2.4 cluster to HDC2.5.
Created 09-26-2016 06:15 PM
Is the following missing from your import statements?
import org.apache.spark.sql.DataFrame
Created 09-27-2016 04:05 PM
I added this line but still getting the same error ?
[info] Compiling 4 Scala sources to /root/weblogs/target/scala-2.11/classes...
[error] /root/weblogs/src/main/scala/LogSQL.scala:60: value createOrReplaceTempView is not a member of org.apache.spark.sql.DataFrame
[error] requestsDataFrame.createOrReplaceTempView("requests")
[error] ^
[error] one error found
[error] (compile:compile) Compilation failed
[error] Total time: 14 s, completed Sep 27, 2016 12:04:22 PM
Created 09-27-2016 04:28 PM
Created 09-27-2016 07:01 PM
I found two solutions to the similar problem I am facing on web , do they apply to my code ?
1- Import implicits:
Note that this should be done only after an instance of org.apache.spark.sql.SQLContext is created. It should be written as:
val sqlContext= new org.apache.spark.sql.SQLContext(sc) import sqlContext.implicits._
2- Move case class outside of the method:
case class, by use of which you define the schema of the DataFrame, should be defined outside of the method needing it. You can read more about it here:
Created 09-29-2016 06:45 PM
I fixed this issue by upgrading from SPARK 1.6.2 to SPARK2.0. I actually upgraded my HDC2.4 cluster to HDC2.5.
Created 09-30-2016 05:55 PM
The upgrade addressed it, but I guess that we still don't know the cause.