Support Questions
Find answers, ask questions, and share your expertise

Spark/Scala Error: value toDF is not a member of org.apache.spark.rdd.RDD

Solved Go to solution

Re: Spark/Scala Error: value toDF is not a member of org.apache.spark.rdd.RDD

New Contributor
Can you show me how you write case class to define schema and how to use it in your method? Thanks so much

Re: Spark/Scala Error: value toDF is not a member of org.apache.spark.rdd.RDD

New Contributor

Hi,

 

Can you shae your program.

 

I am getting one single error mentioned below:-

 

[info] Compiling 1 Scala source to /home/sumeet/SimpleSparkProject/target/scala-2.11/classes...
[error] /home/sumeet/SimpleSparkProject/src/main/scala/SimpleApp.scala:16: value toDF is not a member of org.apache.spark.rdd.RDD[Auction]
[error] val auction = ebay.toDF()
[error] ^

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
object SimpleApp {
def main(args: Array[String]) {
val sc = new SparkContext("local", "Simple App", "/usr/local/spark-1.4.0-incubating",
List("target/scala-2.10/simple-project_2.10-1.0.jar"))
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val ebayText = sc.textFile("/home/sumeet/Desktop/useful huge sample data/ebay.csv")
ebayText.first()
case class Auction(auctionid: String, bid: Float, bidtime: Float, bidder: String, bidderrate: Integer, openbid: Float, price: Float)
val ebay = ebayText.map(_.split(",")).map(p => Auction(p(0),p(1).toFloat,p(2).toFloat,p(3),p(4).toInt,p(5).toFloat,p(6).toFloat))
ebay.first()
ebay.count()
val auction = ebay.toDF()
auction.show()
}
}

 

 

Re: Spark/Scala Error: value toDF is not a member of org.apache.spark.rdd.RDD

New Contributor

Hi, Thank You! It resolved the similar issue that I was facing. However, coulc you please share your knowledge on why is this done? And what exactly implicit does in this case. Reply appreciated. Sorry for reopening this post.

Re: Spark/Scala Error: value toDF is not a member of org.apache.spark.rdd.RDD

New Contributor

package org.example.textclassification

import org.apache.predictionio.controller.P2LAlgorithm
import org.apache.predictionio.controller.Params

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.UserDefinedFunction

import grizzled.slf4j.Logger

case class LRAlgorithmParams(regParam: Double) extends Params

class LRAlgorithm(val ap: LRAlgorithmParams)
extends P2LAlgorithm[PreparedData, LRModel, Query, PredictedResult] {

@transient lazy val logger = Logger[this.type]

def train(sc: SparkContext, pd: PreparedData): LRModel = {

// Import SQLContext for creating DataFrame.
val sql: SQLContext = new SQLContext(sc)
import sql.implicits._

val lr = new LogisticRegression()
.setMaxIter(10)
.setThreshold(0.5)
.setRegParam(ap.regParam)

val labels: Seq[Double] = pd.categoryMap.keys.toSeq

val data = labels.foldLeft(pd.transformedData.toDF)( //transform to Spark DataFrame
// Add the different binary columns for each label.
(data: DataFrame, label: Double) => {
// function: multiclass labels --> binary labels
val f: UserDefinedFunction = functions.udf((e : Double) => if (e == label) 1.0 else 0.0)

data.withColumn(label.toInt.toString, f(data("label")))
}
ubuntu@ip-172-20-9-118:/spark/tracxn/predictionio/classification/isCompany$ cat src/main/scala/LRAlgorithm.scala
package org.example.textclassification

import org.apache.predictionio.controller.P2LAlgorithm
import org.apache.predictionio.controller.Params

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.UserDefinedFunction

import grizzled.slf4j.Logger

case class LRAlgorithmParams(regParam: Double) extends Params

class LRAlgorithm(val ap: LRAlgorithmParams)
extends P2LAlgorithm[PreparedData, LRModel, Query, PredictedResult] {

@transient lazy val logger = Logger[this.type]

def train(sc: SparkContext, pd: PreparedData): LRModel = {

// Import SQLContext for creating DataFrame.
val sql: SQLContext = new SQLContext(sc)
import sql.implicits._

val lr = new LogisticRegression()
.setMaxIter(10)
.setThreshold(0.5)
.setRegParam(ap.regParam)

val labels: Seq[Double] = pd.categoryMap.keys.toSeq

val data = labels.foldLeft(pd.transformedData.toDF)( //transform to Spark DataFrame
// Add the different binary columns for each label.
(data: DataFrame, label: Double) => {
// function: multiclass labels --> binary labels
val f: UserDefinedFunction = functions.udf((e : Double) => if (e == label) 1.0 else 0.0)

data.withColumn(label.toInt.toString, f(data("label")))
}
)

// Create a logistic regression model for each class.
val lrModels : Seq[(Double, LREstimate)] = labels.map(
label => {
val lab = label.toInt.toString

val fit = lr.setLabelCol(lab).fit(
data.select(lab, "features")
)

// Return (label, feature coefficients, and intercept term.
(label, LREstimate(fit.weights.toArray, fit.intercept))

}
)

new LRModel(
tfIdf = pd.tfIdf,
categoryMap = pd.categoryMap,
lrModels = lrModels
)
}

def predict(model: LRModel, query: Query): PredictedResult = {
model.predict(query.text)
}
}

case class LREstimate (
coefficients : Array[Double],
intercept : Double
)

class LRModel(
val tfIdf: TFIDFModel,
val categoryMap: Map[Double, String],
val lrModels: Seq[(Double, LREstimate)]) extends Serializable {

/** Enable vector inner product for prediction. */
private def innerProduct (x : Array[Double], y : Array[Double]) : Double = {
x.zip(y).map(e => e._1 * e._2).sum
}

/** Define prediction rule. */
def predict(text: String): PredictedResult = {
val x: Array[Double] = tfIdf.transform(text).toArray

// Logistic Regression binary formula for positive probability.
// According to MLLib documentation, class labeled 0 is used as pivot.
// Thus, we are using:
// log(p1/p0) = log(p1/(1 - p1)) = b0 + xTb =: z
// p1 = exp(z) * (1 - p1)
// p1 * (1 + exp(z)) = exp(z)
// p1 = exp(z)/(1 + exp(z))
val pred = lrModels.map(
e => {
val z = scala.math.exp(innerProduct(e._2.coefficients, x) + e._2.intercept)
(e._1, z / (1 + z))
}
).maxBy(_._2)

PredictedResult(categoryMap(pred._1), pred._2)
}

override def toString = s"LR model"
}

 

Getting same error in my code . Can you help me how to fix it 

Re: Spark/Scala Error: value toDF is not a member of org.apache.spark.rdd.RDD

New Contributor

Import implicit

 

 where sc=

val sc = SparkSession
.builder()
.appName("demo")
.master("local")
.getOrCreate()

import sc.implicits._