Support Questions

Find answers, ask questions, and share your expertise

Run RDD operations on SQL Dataframe in 1.3.1

avatar
Guru

I am trying to run regression on a dataset, but I ran into 2 issues:

1. When I try to Split the dataset, that I imported from a textfile, I get the following error:

java.lang.NumberFormatException: For input string: "[34"

That's because the textfile has the data in the format: [x, y, z ....] [a, b, c ....]

2. So I try to use SparkSQL to create a DF that I can then convert to RDD using xRDD = x.rdd, but I get a type mismatch error.

found : org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] required: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint]

How should I resolve this ?

1 ACCEPTED SOLUTION

avatar
New Contributor

Looks like the expected format for labeled points is different from what you have. I did run below statement to understand format. I guess the row should be in the format highlighted below.

scala> val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))

pos: org.apache.spark.mllib.regression.LabeledPoint = (1.0,[1.0,0.0,3.0])

View solution in original post

4 REPLIES 4

avatar
Explorer

Can u please post the full code and error log?

avatar
Guru

@Ofer Mendelevith

I think its an issue with LabeledPoint. It's expecting Labeled but not getting it.

val examples = MLUtils.loadLabeledData(sc,"hdfs:///user/zeppelin/las_demo/part-00000").cache()

val splits = examples.randomSplit(Array(0.8, 0.2))

val training = splits(0).cache() val test = splits(1).cache()

val numTraining = training.count()

val numTest = test.count()

println(s"Training: $numTraining, test: $numTest.")

val updater = new SquaredL2Updater() val model = { val algorithm = new LogisticRegressionWithSGD() algorithm.optimizer.setNumIterations(200).setStepSize(1.0).setUpdater(updater).setRegParam(0.1) algorithm.run(training).clearThreshold() }

val rprediction = model.predict(test.map(_.features)) val rpredictionAndLabel = rprediction.zip(testRDD.map(_.label))

val rmetrics = new BinaryClassificationMetrics(rpredictionAndLabel)

ERROR is as follows:

warning: there were 1 deprecation warning(s); re-run with -deprecation for details examples: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = MapPartitionsRDD[52] at map at MLUtils.scala:214 splits: Array[org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint]] = Array(PartitionwiseSampledRDD[53] at randomSplit at <console>:72, PartitionwiseSampledRDD[54] at randomSplit at <console>:72) training: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = PartitionwiseSampledRDD[53] at randomSplit at <console>:72 test: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = PartitionwiseSampledRDD[54] at randomSplit at <console>:72 numTraining: Long = 19589 numTest: Long = 4889 Training: 19589, test: 4889. updater: org.apache.spark.mllib.optimization.SquaredL2Updater = org.apache.spark.mllib.optimization.SquaredL2Updater@3b9284cd org.apache.spark.SparkException: Input validation failed. at org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:210) at org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:190) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:81) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:87) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:89) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:91) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:93) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:95) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:97) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:99) at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:101) at $iwC$$iwC$$iwC$$iwC.<init>(<console>:103) at $iwC$$iwC$$iwC.<init>(<console>:105) at $iwC$$iwC.<init>(<console>:107) at $iwC.<init>(<console>:109) at <init>(<console>:111) at .<init>(<console>:115) at .<clinit>(<console>) at .<init>(<console>:7) at .<clinit>(<console>) at $print(<console>) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) at org.apache.zeppelin.spark.SparkInterpreter.interpretInput(SparkInterpreter.java:655) at org.apache.zeppelin.spark.SparkInterpreter.interpret(SparkInterpreter.java:620) at org.apache.zeppelin.spark.SparkInterpreter.interpret(SparkInterpreter.java:613) at org.apache.zeppelin.interpreter.ClassloaderInterpreter.interpret(ClassloaderInterpreter.java:57) at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:93) at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:276) at org.apache.zeppelin.scheduler.Job.run(Job.java:170) at org.apache.zeppelin.scheduler.FIFOScheduler$1.run(FIFOScheduler.java:118) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)

avatar
New Contributor

Looks like the expected format for labeled points is different from what you have. I did run below statement to understand format. I guess the row should be in the format highlighted below.

scala> val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))

pos: org.apache.spark.mllib.regression.LabeledPoint = (1.0,[1.0,0.0,3.0])

avatar
Master Mentor

@Vedant Jain can you accept the best answer to close this thread or post your solution?