Archives of Support Questions (Read Only)

This is an archived board for historical reference. Information and links may no longer be available or relevant
Announcements
This board is archived and read-only for historical reference. To ask a new question, please post a new topic on the appropriate active board.

Run RDD operations on SQL Dataframe in 1.3.1

avatar
Guru

I am trying to run regression on a dataset, but I ran into 2 issues:

1. When I try to Split the dataset, that I imported from a textfile, I get the following error:

java.lang.NumberFormatException: For input string: "[34"

That's because the textfile has the data in the format: [x, y, z ....] [a, b, c ....]

2. So I try to use SparkSQL to create a DF that I can then convert to RDD using xRDD = x.rdd, but I get a type mismatch error.

found : org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] required: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint]

How should I resolve this ?

1 ACCEPTED SOLUTION

avatar

Looks like the expected format for labeled points is different from what you have. I did run below statement to understand format. I guess the row should be in the format highlighted below.

scala> val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))

pos: org.apache.spark.mllib.regression.LabeledPoint = (1.0,[1.0,0.0,3.0])

View solution in original post

4 REPLIES 4

avatar
New Member

Can u please post the full code and error log?

avatar
Guru

@Ofer Mendelevith

I think its an issue with LabeledPoint. It's expecting Labeled but not getting it.

val examples = MLUtils.loadLabeledData(sc,"hdfs:///user/zeppelin/las_demo/part-00000").cache()

val splits = examples.randomSplit(Array(0.8, 0.2))

val training = splits(0).cache() val test = splits(1).cache()

val numTraining = training.count()

val numTest = test.count()

println(s"Training: $numTraining, test: $numTest.")

val updater = new SquaredL2Updater() val model = { val algorithm = new LogisticRegressionWithSGD() algorithm.optimizer.setNumIterations(200).setStepSize(1.0).setUpdater(updater).setRegParam(0.1) algorithm.run(training).clearThreshold() }

val rprediction = model.predict(test.map(_.features)) val rpredictionAndLabel = rprediction.zip(testRDD.map(_.label))

val rmetrics = new BinaryClassificationMetrics(rpredictionAndLabel)

ERROR is as follows:

warning: there were 1 deprecation warning(s); re-run with -deprecation for details examples: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = MapPartitionsRDD[52] at map at MLUtils.scala:214 splits: Array[org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint]] = Array(PartitionwiseSampledRDD[53] at randomSplit at <console>:72, PartitionwiseSampledRDD[54] at randomSplit at <console>:72) training: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = PartitionwiseSampledRDD[53] at randomSplit at <console>:72 test: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = PartitionwiseSampledRDD[54] at randomSplit at <console>:72 numTraining: Long = 19589 numTest: Long = 4889 Training: 19589, test: 4889. updater: org.apache.spark.mllib.optimization.SquaredL2Updater = org.apache.spark.mllib.optimization.SquaredL2Updater@3b9284cd org.apache.spark.SparkException: Input validation failed. at org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:210) at org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:190) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:81) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:87) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:89) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:91) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:93) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:95) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:97) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:99) at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:101) at $iwC$$iwC$$iwC$$iwC.<init>(<console>:103) at $iwC$$iwC$$iwC.<init>(<console>:105) at $iwC$$iwC.<init>(<console>:107) at $iwC.<init>(<console>:109) at <init>(<console>:111) at .<init>(<console>:115) at .<clinit>(<console>) at .<init>(<console>:7) at .<clinit>(<console>) at $print(<console>) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) at org.apache.zeppelin.spark.SparkInterpreter.interpretInput(SparkInterpreter.java:655) at org.apache.zeppelin.spark.SparkInterpreter.interpret(SparkInterpreter.java:620) at org.apache.zeppelin.spark.SparkInterpreter.interpret(SparkInterpreter.java:613) at org.apache.zeppelin.interpreter.ClassloaderInterpreter.interpret(ClassloaderInterpreter.java:57) at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:93) at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:276) at org.apache.zeppelin.scheduler.Job.run(Job.java:170) at org.apache.zeppelin.scheduler.FIFOScheduler$1.run(FIFOScheduler.java:118) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)

avatar

Looks like the expected format for labeled points is different from what you have. I did run below statement to understand format. I guess the row should be in the format highlighted below.

scala> val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))

pos: org.apache.spark.mllib.regression.LabeledPoint = (1.0,[1.0,0.0,3.0])

avatar
Master Mentor

@Vedant Jain can you accept the best answer to close this thread or post your solution?