Member since
10-26-2017
1
Post
0
Kudos Received
0
Solutions
10-26-2017
07:35 AM
package org.example.textclassification import org.apache.predictionio.controller.P2LAlgorithm import org.apache.predictionio.controller.Params import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.rdd.RDD import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.sql.DataFrame import org.apache.spark.sql.functions import org.apache.spark.sql.SQLContext import org.apache.spark.sql.UserDefinedFunction import grizzled.slf4j.Logger case class LRAlgorithmParams(regParam: Double) extends Params class LRAlgorithm(val ap: LRAlgorithmParams) extends P2LAlgorithm[PreparedData, LRModel, Query, PredictedResult] { @transient lazy val logger = Logger[this.type] def train(sc: SparkContext, pd: PreparedData): LRModel = { // Import SQLContext for creating DataFrame. val sql: SQLContext = new SQLContext(sc) import sql.implicits._ val lr = new LogisticRegression() .setMaxIter(10) .setThreshold(0.5) .setRegParam(ap.regParam) val labels: Seq[Double] = pd.categoryMap.keys.toSeq val data = labels.foldLeft(pd.transformedData.toDF)( //transform to Spark DataFrame // Add the different binary columns for each label. (data: DataFrame, label: Double) => { // function: multiclass labels --> binary labels val f: UserDefinedFunction = functions.udf((e : Double) => if (e == label) 1.0 else 0.0) data.withColumn(label.toInt.toString, f(data("label"))) } ubuntu@ip-172-20-9-118:/spark/tracxn/predictionio/classification/isCompany$ cat src/main/scala/LRAlgorithm.scala package org.example.textclassification import org.apache.predictionio.controller.P2LAlgorithm import org.apache.predictionio.controller.Params import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.rdd.RDD import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.sql.DataFrame import org.apache.spark.sql.functions import org.apache.spark.sql.SQLContext import org.apache.spark.sql.UserDefinedFunction import grizzled.slf4j.Logger case class LRAlgorithmParams(regParam: Double) extends Params class LRAlgorithm(val ap: LRAlgorithmParams) extends P2LAlgorithm[PreparedData, LRModel, Query, PredictedResult] { @transient lazy val logger = Logger[this.type] def train(sc: SparkContext, pd: PreparedData): LRModel = { // Import SQLContext for creating DataFrame. val sql: SQLContext = new SQLContext(sc) import sql.implicits._ val lr = new LogisticRegression() .setMaxIter(10) .setThreshold(0.5) .setRegParam(ap.regParam) val labels: Seq[Double] = pd.categoryMap.keys.toSeq val data = labels.foldLeft(pd.transformedData.toDF)( //transform to Spark DataFrame // Add the different binary columns for each label. (data: DataFrame, label: Double) => { // function: multiclass labels --> binary labels val f: UserDefinedFunction = functions.udf((e : Double) => if (e == label) 1.0 else 0.0) data.withColumn(label.toInt.toString, f(data("label"))) } ) // Create a logistic regression model for each class. val lrModels : Seq[(Double, LREstimate)] = labels.map( label => { val lab = label.toInt.toString val fit = lr.setLabelCol(lab).fit( data.select(lab, "features") ) // Return (label, feature coefficients, and intercept term. (label, LREstimate(fit.weights.toArray, fit.intercept)) } ) new LRModel( tfIdf = pd.tfIdf, categoryMap = pd.categoryMap, lrModels = lrModels ) } def predict(model: LRModel, query: Query): PredictedResult = { model.predict(query.text) } } case class LREstimate ( coefficients : Array[Double], intercept : Double ) class LRModel( val tfIdf: TFIDFModel, val categoryMap: Map[Double, String], val lrModels: Seq[(Double, LREstimate)]) extends Serializable { /** Enable vector inner product for prediction. */ private def innerProduct (x : Array[Double], y : Array[Double]) : Double = { x.zip(y).map(e => e._1 * e._2).sum } /** Define prediction rule. */ def predict(text: String): PredictedResult = { val x: Array[Double] = tfIdf.transform(text).toArray // Logistic Regression binary formula for positive probability. // According to MLLib documentation, class labeled 0 is used as pivot. // Thus, we are using: // log(p1/p0) = log(p1/(1 - p1)) = b0 + xTb =: z // p1 = exp(z) * (1 - p1) // p1 * (1 + exp(z)) = exp(z) // p1 = exp(z)/(1 + exp(z)) val pred = lrModels.map( e => { val z = scala.math.exp(innerProduct(e._2.coefficients, x) + e._2.intercept) (e._1, z / (1 + z)) } ).maxBy(_._2) PredictedResult(categoryMap(pred._1), pred._2) } override def toString = s"LR model" } Getting same error in my code . Can you help me how to fix it
... View more