Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Spark/Scala Error: value toDF is not a member of org.apache.spark.rdd.RDD

Solved Go to solution

Re: Spark/Scala Error: value toDF is not a member of org.apache.spark.rdd.RDD

New Contributor
Can you show me how you write case class to define schema and how to use it in your method? Thanks so much

Re: Spark/Scala Error: value toDF is not a member of org.apache.spark.rdd.RDD

New Contributor

Hi,

 

Can you shae your program.

 

I am getting one single error mentioned below:-

 

[info] Compiling 1 Scala source to /home/sumeet/SimpleSparkProject/target/scala-2.11/classes...
[error] /home/sumeet/SimpleSparkProject/src/main/scala/SimpleApp.scala:16: value toDF is not a member of org.apache.spark.rdd.RDD[Auction]
[error] val auction = ebay.toDF()
[error] ^

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
object SimpleApp {
def main(args: Array[String]) {
val sc = new SparkContext("local", "Simple App", "/usr/local/spark-1.4.0-incubating",
List("target/scala-2.10/simple-project_2.10-1.0.jar"))
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val ebayText = sc.textFile("/home/sumeet/Desktop/useful huge sample data/ebay.csv")
ebayText.first()
case class Auction(auctionid: String, bid: Float, bidtime: Float, bidder: String, bidderrate: Integer, openbid: Float, price: Float)
val ebay = ebayText.map(_.split(",")).map(p => Auction(p(0),p(1).toFloat,p(2).toFloat,p(3),p(4).toInt,p(5).toFloat,p(6).toFloat))
ebay.first()
ebay.count()
val auction = ebay.toDF()
auction.show()
}
}

 

 

Re: Spark/Scala Error: value toDF is not a member of org.apache.spark.rdd.RDD

New Contributor

Hi, Thank You! It resolved the similar issue that I was facing. However, coulc you please share your knowledge on why is this done? And what exactly implicit does in this case. Reply appreciated. Sorry for reopening this post.

Highlighted

Re: Spark/Scala Error: value toDF is not a member of org.apache.spark.rdd.RDD

New Contributor

package org.example.textclassification

import org.apache.predictionio.controller.P2LAlgorithm
import org.apache.predictionio.controller.Params

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.UserDefinedFunction

import grizzled.slf4j.Logger

case class LRAlgorithmParams(regParam: Double) extends Params

class LRAlgorithm(val ap: LRAlgorithmParams)
extends P2LAlgorithm[PreparedData, LRModel, Query, PredictedResult] {

@transient lazy val logger = Logger[this.type]

def train(sc: SparkContext, pd: PreparedData): LRModel = {

// Import SQLContext for creating DataFrame.
val sql: SQLContext = new SQLContext(sc)
import sql.implicits._

val lr = new LogisticRegression()
.setMaxIter(10)
.setThreshold(0.5)
.setRegParam(ap.regParam)

val labels: Seq[Double] = pd.categoryMap.keys.toSeq

val data = labels.foldLeft(pd.transformedData.toDF)( //transform to Spark DataFrame
// Add the different binary columns for each label.
(data: DataFrame, label: Double) => {
// function: multiclass labels --> binary labels
val f: UserDefinedFunction = functions.udf((e : Double) => if (e == label) 1.0 else 0.0)

data.withColumn(label.toInt.toString, f(data("label")))
}
ubuntu@ip-172-20-9-118:/spark/tracxn/predictionio/classification/isCompany$ cat src/main/scala/LRAlgorithm.scala
package org.example.textclassification

import org.apache.predictionio.controller.P2LAlgorithm
import org.apache.predictionio.controller.Params

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.UserDefinedFunction

import grizzled.slf4j.Logger

case class LRAlgorithmParams(regParam: Double) extends Params

class LRAlgorithm(val ap: LRAlgorithmParams)
extends P2LAlgorithm[PreparedData, LRModel, Query, PredictedResult] {

@transient lazy val logger = Logger[this.type]

def train(sc: SparkContext, pd: PreparedData): LRModel = {

// Import SQLContext for creating DataFrame.
val sql: SQLContext = new SQLContext(sc)
import sql.implicits._

val lr = new LogisticRegression()
.setMaxIter(10)
.setThreshold(0.5)
.setRegParam(ap.regParam)

val labels: Seq[Double] = pd.categoryMap.keys.toSeq

val data = labels.foldLeft(pd.transformedData.toDF)( //transform to Spark DataFrame
// Add the different binary columns for each label.
(data: DataFrame, label: Double) => {
// function: multiclass labels --> binary labels
val f: UserDefinedFunction = functions.udf((e : Double) => if (e == label) 1.0 else 0.0)

data.withColumn(label.toInt.toString, f(data("label")))
}
)

// Create a logistic regression model for each class.
val lrModels : Seq[(Double, LREstimate)] = labels.map(
label => {
val lab = label.toInt.toString

val fit = lr.setLabelCol(lab).fit(
data.select(lab, "features")
)

// Return (label, feature coefficients, and intercept term.
(label, LREstimate(fit.weights.toArray, fit.intercept))

}
)

new LRModel(
tfIdf = pd.tfIdf,
categoryMap = pd.categoryMap,
lrModels = lrModels
)
}

def predict(model: LRModel, query: Query): PredictedResult = {
model.predict(query.text)
}
}

case class LREstimate (
coefficients : Array[Double],
intercept : Double
)

class LRModel(
val tfIdf: TFIDFModel,
val categoryMap: Map[Double, String],
val lrModels: Seq[(Double, LREstimate)]) extends Serializable {

/** Enable vector inner product for prediction. */
private def innerProduct (x : Array[Double], y : Array[Double]) : Double = {
x.zip(y).map(e => e._1 * e._2).sum
}

/** Define prediction rule. */
def predict(text: String): PredictedResult = {
val x: Array[Double] = tfIdf.transform(text).toArray

// Logistic Regression binary formula for positive probability.
// According to MLLib documentation, class labeled 0 is used as pivot.
// Thus, we are using:
// log(p1/p0) = log(p1/(1 - p1)) = b0 + xTb =: z
// p1 = exp(z) * (1 - p1)
// p1 * (1 + exp(z)) = exp(z)
// p1 = exp(z)/(1 + exp(z))
val pred = lrModels.map(
e => {
val z = scala.math.exp(innerProduct(e._2.coefficients, x) + e._2.intercept)
(e._1, z / (1 + z))
}
).maxBy(_._2)

PredictedResult(categoryMap(pred._1), pred._2)
}

override def toString = s"LR model"
}

 

Getting same error in my code . Can you help me how to fix it 

Don't have an account?
Coming from Hortonworks? Activate your account here