Support Questions

Find answers, ask questions, and share your expertise

Spark/Scala Error: value toDF is not a member of org.apache.spark.rdd.RDD

Hi all,

I am trying to create a DataFrame of a text file which gives me error: "value toDF is not a member of org.apache.spark.rdd.RDD"

 

The only solution I can find online is to import SQLContext.implicits._ which in trun throws "not found: value SQLContext"

 

I googled this new error but couldn't find anything. The funny part is that the piece of code I am using works in Spark-Shell, but fails when I try to build it using sbt package

I am suing Cloudera's QuickStart VM and My Spark Version is 1.3.0 and my Scala Version: 2.10.4 .

 

Any help is highly appreciated,

Cheers.

 

Here comes my piece of code:

 

 

import...........

import SQLContext.implicits._

...

class Class_1() extends Runnable {
val conf = new SparkConf().setAppName("TestApp")
val sc = new SparkContext(conf)

val sqlContext= new org.apache.spark.sql.SQLContext(sc)
var fDimCustomer = sc.textFile("DimCustomer.txt")

 

def loadData(fileName:String) {

fDimCustomer = sc.textFile("DimCustomer.txt")

case class DimC(ID:Int, Name:String)
var dimCustomer1 = fDimCustomer.map(_.split(',')).map(r=>DimC(r(0).toInt,r(1))).toDF
dimCustomer1.registerTempTable("Cust_1")

val customers = sqlContext.sql("select * from Cust_1")
customers.show()

}

......

1 ACCEPTED SOLUTION

Ok, I finally fixed the issue. 2 things needed to be done:

 

1- Import implicits:

      Note that this should be done only after an instance of org.apache.spark.sql.SQLContext is created. It should be written as:

      val sqlContext= new org.apache.spark.sql.SQLContext(sc)
      import sqlContext.implicits._

 

 

2- Move case class outside of the method:

      case class, by use of which you define the schema of the DataFrame, should be defined outside of the method needing it. You can read more about it here:

      https://issues.scala-lang.org/browse/SI-6649

 

Cheers.

View solution in original post

14 REPLIES 14

Master Collaborator

I think you're missing the package name. org.apache.spark.sql.SQLContext...

I am having it as
val sqlContext= new org.apache.spark.sql.SQLContext(sc)

How should it be?

Master Collaborator

I meant in the import; you're missing the implicits, I think.

 

import org.apache.spark.sql.SQLContext.implicits._

I tried that, still the same errors..

This is the content of my .sbt file:

 

name := "WatchApp"

version := "1.0"

scalaVersion := "2.10.4"

libraryDependencies ++= Seq (
"org.apache.spark" %% "spark-core" % "1.3.0",
"org.apache.spark" %% "spark-sql" % "1.3.0"
)

 

 

Can you spot any mistakes in it?

Master Collaborator

Ah, I think I'm mistaken. Try this; note the capitalization:

 

import sqlContext.implicits._

I tried every combination of capitalizations. None of them works.

 

I reckon the key to notice is that, when I use "import sqlContext.implicits._" in spark-shell, and then run:

 

case class DimC(ID:Int, Name:String, City:String, EffectiveFrom:Int, EffectiveTo:Int)

 

It throws the error below. But it works perfectly the second time and creates the class. 

 

 

 

15/07/21 04:48:12 INFO HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore
15/07/21 04:48:12 INFO ObjectStore: ObjectStore, initialize called
15/07/21 04:48:12 WARN General: Plugin (Bundle) "org.datanucleus.api.jdo" is already registered. Ensure you dont have multiple JAR versions of the same plugin in the classpath. The URL "file:/usr/lib/hive/lib/datanucleus-api-jdo-3.2.6.jar" is already registered, and you are trying to register an identical plugin located at URL "file:/usr/lib/flume-ng/lib/datanucleus-api-jdo-3.2.1.jar."
15/07/21 04:48:12 WARN General: Plugin (Bundle) "org.datanucleus.store.rdbms" is already registered. Ensure you dont have multiple JAR versions of the same plugin in the classpath. The URL "file:/usr/lib/flume-ng/lib/datanucleus-rdbms-3.2.1.jar" is already registered, and you are trying to register an identical plugin located at URL "file:/usr/lib/hive/lib/datanucleus-rdbms-3.2.9.jar."
15/07/21 04:48:12 WARN General: Plugin (Bundle) "org.datanucleus" is already registered. Ensure you dont have multiple JAR versions of the same plugin in the classpath. The URL "file:/usr/lib/hive/lib/datanucleus-core-3.2.10.jar" is already registered, and you are trying to register an identical plugin located at URL "file:/usr/lib/flume-ng/lib/datanucleus-core-3.2.2.jar."
15/07/21 04:48:12 INFO Persistence: Property datanucleus.cache.level2 unknown - will be ignored
15/07/21 04:48:12 INFO Persistence: Property hive.metastore.integral.jdo.pushdown unknown - will be ignored
15/07/21 04:48:13 WARN HiveMetaStore: Retrying creating default database after error: Error creating transactional connection factory
javax.jdo.JDOFatalInternalException: Error creating transactional connection factory
at org.datanucleus.api.jdo.NucleusJDOHelper.getJDOExceptionForNucleusException(NucleusJDOHelper.java:587)
at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.freezeConfiguration(JDOPersistenceManagerFactory.java:788)
at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.createPersistenceManagerFactory(JDOPersistenceManagerFactory.java:333)
at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.getPersistenceManagerFactory(JDOPersistenceManagerFactory.java:202)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at javax.jdo.JDOHelper$16.run(JDOHelper.java:1965)
at java.security.AccessController.doPrivileged(Native Method)
at javax.jdo.JDOHelper.invoke(JDOHelper.java:1960)
at javax.jdo.JDOHelper.invokeGetPersistenceManagerFactoryOnImplementation(JDOHelper.java:1166)
at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:808)
at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:701)
at org.apache.hadoop.hive.metastore.ObjectStore.getPMF(ObjectStore.java:365)
at org.apache.hadoop.hive.metastore.ObjectStore.getPersistenceManager(ObjectStore.java:394)
at org.apache.hadoop.hive.metastore.ObjectStore.initialize(ObjectStore.java:291)
at org.apache.hadoop.hive.metastore.ObjectStore.setConf(ObjectStore.java:258)
at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:73)
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133)
at org.apache.hadoop.hive.metastore.RawStoreProxy.<init>(RawStoreProxy.java:56)
at org.apache.hadoop.hive.metastore.RawStoreProxy.getProxy(RawStoreProxy.java:65)
at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.newRawStore(HiveMetaStore.java:579)
at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.getMS(HiveMetaStore.java:557)
at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.createDefaultDB(HiveMetaStore.java:606)
at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMetaStore.java:448)
at org.apache.hadoop.hive.metastore.RetryingHMSHandler.<init>(RetryingHMSHandler.java:66)
at org.apache.hadoop.hive.metastore.RetryingHMSHandler.getProxy(RetryingHMSHandler.java:72)
at org.apache.hadoop.hive.metastore.HiveMetaStore.newRetryingHMSHandler(HiveMetaStore.java:5601)
at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.<init>(HiveMetaStoreClient.java:193)
at org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient.<init>(SessionHiveMetaStoreClient.java:74)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1486)
at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.<init>(RetryingMetaStoreClient.java:64)
at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:74)
at org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:2841)
at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:2860)
at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:453)
at org.apache.spark.sql.hive.HiveContext.sessionState$lzycompute(HiveContext.scala:229)
at org.apache.spark.sql.hive.HiveContext.sessionState(HiveContext.scala:225)
at org.apache.spark.sql.hive.HiveContext$QueryExecution.<init>(HiveContext.scala:373)
at org.apache.spark.sql.hive.HiveContext.executePlan(HiveContext.scala:80)
at org.apache.spark.sql.hive.HiveContext.executePlan(HiveContext.scala:49)
at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:131)
at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:51)
at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:319)
at org.apache.spark.sql.SQLContext$implicits$.rddToDataFrameHolder(SQLContext.scala:254)
at $line25.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:28)
at $line25.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:33)
at $line25.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:35)
at $line25.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:37)
at $line25.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:39)
at $line25.$read$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:41)
at $line25.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:43)
at $line25.$read$$iwC$$iwC$$iwC.<init>(<console>:45)
at $line25.$read$$iwC$$iwC.<init>(<console>:47)
at $line25.$read$$iwC.<init>(<console>:49)
at $line25.$read.<init>(<console>:51)
at $line25.$read$.<init>(<console>:55)
at $line25.$read$.<clinit>(<console>)
at $line25.$eval$.<init>(<console>:7)
at $line25.$eval$.<clinit>(<console>)
at $line25.$eval.$print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:856)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:901)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:813)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:656)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:664)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:669)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:996)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:944)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1058)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
NestedThrowablesStackTrace:
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at org.datanucleus.plugin.NonManagedPluginRegistry.createExecutableExtension(NonManagedPluginRegistry.java:631)
at org.datanucleus.plugin.PluginManager.createExecutableExtension(PluginManager.java:325)
at org.datanucleus.store.AbstractStoreManager.registerConnectionFactory(AbstractStoreManager.java:282)
at org.datanucleus.store.AbstractStoreManager.<init>(AbstractStoreManager.java:240)
at org.datanucleus.store.rdbms.RDBMSStoreManager.<init>(RDBMSStoreManager.java:286)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at org.datanucleus.plugin.NonManagedPluginRegistry.createExecutableExtension(NonManagedPluginRegistry.java:631)
at org.datanucleus.plugin.PluginManager.createExecutableExtension(PluginManager.java:301)
at org.datanucleus.NucleusContext.createStoreManagerForProperties(NucleusContext.java:1187)
at org.datanucleus.NucleusContext.initialise(NucleusContext.java:356)
at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.freezeConfiguration(JDOPersistenceManagerFactory.java:775)
at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.createPersistenceManagerFactory(JDOPersistenceManagerFactory.java:333)
at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.getPersistenceManagerFactory(JDOPersistenceManagerFactory.java:202)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at javax.jdo.JDOHelper$16.run(JDOHelper.java:1965)
at java.security.AccessController.doPrivileged(Native Method)
at javax.jdo.JDOHelper.invoke(JDOHelper.java:1960)
at javax.jdo.JDOHelper.invokeGetPersistenceManagerFactoryOnImplementation(JDOHelper.java:1166)
at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:808)
at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:701)
at org.apache.hadoop.hive.metastore.ObjectStore.getPMF(ObjectStore.java:365)
at org.apache.hadoop.hive.metastore.ObjectStore.getPersistenceManager(ObjectStore.java:394)
at org.apache.hadoop.hive.metastore.ObjectStore.initialize(ObjectStore.java:291)
at org.apache.hadoop.hive.metastore.ObjectStore.setConf(ObjectStore.java:258)
at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:73)
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133)
at org.apache.hadoop.hive.metastore.RawStoreProxy.<init>(RawStoreProxy.java:56)
at org.apache.hadoop.hive.metastore.RawStoreProxy.getProxy(RawStoreProxy.java:65)
at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.newRawStore(HiveMetaStore.java:579)
at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.getMS(HiveMetaStore.java:557)
at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.createDefaultDB(HiveMetaStore.java:606)
at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMetaStore.java:448)
at org.apache.hadoop.hive.metastore.RetryingHMSHandler.<init>(RetryingHMSHandler.java:66)
at org.apache.hadoop.hive.metastore.RetryingHMSHandler.getProxy(RetryingHMSHandler.java:72)
at org.apache.hadoop.hive.metastore.HiveMetaStore.newRetryingHMSHandler(HiveMetaStore.java:5601)
at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.<init>(HiveMetaStoreClient.java:193)
at org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient.<init>(SessionHiveMetaStoreClient.java:74)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1486)
at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.<init>(RetryingMetaStoreClient.java:64)
at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:74)
at org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:2841)
at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:2860)
at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:453)
at org.apache.spark.sql.hive.HiveContext.sessionState$lzycompute(HiveContext.scala:229)
at org.apache.spark.sql.hive.HiveContext.sessionState(HiveContext.scala:225)
at org.apache.spark.sql.hive.HiveContext$QueryExecution.<init>(HiveContext.scala:373)
at org.apache.spark.sql.hive.HiveContext.executePlan(HiveContext.scala:80)
at org.apache.spark.sql.hive.HiveContext.executePlan(HiveContext.scala:49)
at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:131)
at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:51)
at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:319)
at org.apache.spark.sql.SQLContext$implicits$.rddToDataFrameHolder(SQLContext.scala:254)
at $line25.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:28)
at $line25.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:33)
at $line25.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:35)
at $line25.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:37)
at $line25.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:39)
at $line25.$read$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:41)
at $line25.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:43)
at $line25.$read$$iwC$$iwC$$iwC.<init>(<console>:45)
at $line25.$read$$iwC$$iwC.<init>(<console>:47)
at $line25.$read$$iwC.<init>(<console>:49)
at $line25.$read.<init>(<console>:51)
at $line25.$read$.<init>(<console>:55)
at $line25.$read$.<clinit>(<console>)
at $line25.$eval$.<init>(<console>:7)
at $line25.$eval$.<clinit>(<console>)
at $line25.$eval.$print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:856)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:901)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:813)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:656)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:664)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:669)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:996)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:944)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1058)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ExceptionInInitializerError
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at java.lang.Class.newInstance(Class.java:374)
at org.datanucleus.store.rdbms.connectionpool.AbstractConnectionPoolFactory.loadDriver(AbstractConnectionPoolFactory.java:47)
at org.datanucleus.store.rdbms.connectionpool.BoneCPConnectionPoolFactory.createConnectionPool(BoneCPConnectionPoolFactory.java:54)
at org.datanucleus.store.rdbms.ConnectionFactoryImpl.generateDataSources(ConnectionFactoryImpl.java:238)
at org.datanucleus.store.rdbms.ConnectionFactoryImpl.initialiseDataSources(ConnectionFactoryImpl.java:131)
at org.datanucleus.store.rdbms.ConnectionFactoryImpl.<init>(ConnectionFactoryImpl.java:85)
... 114 more
Caused by: java.lang.SecurityException: sealing violation: package org.apache.derby.impl.services.locks is sealed
at java.net.URLClassLoader.getAndVerifyPackage(URLClassLoader.java:388)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:417)
at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:190)
at org.apache.derby.impl.services.monitor.BaseMonitor.getImplementations(Unknown Source)
at org.apache.derby.impl.services.monitor.BaseMonitor.getDefaultImplementations(Unknown Source)
at org.apache.derby.impl.services.monitor.BaseMonitor.runWithState(Unknown Source)
at org.apache.derby.impl.services.monitor.FileMonitor.<init>(Unknown Source)
at org.apache.derby.iapi.services.monitor.Monitor.startMonitor(Unknown Source)
at org.apache.derby.iapi.jdbc.JDBCBoot.boot(Unknown Source)
at org.apache.derby.jdbc.EmbeddedDriver.boot(Unknown Source)
at org.apache.derby.jdbc.EmbeddedDriver.<clinit>(Unknown Source)
... 124 more
15/07/21 04:48:13 INFO HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore
15/07/21 04:48:13 INFO ObjectStore: ObjectStore, initialize called
15/07/21 04:48:13 WARN General: Plugin (Bundle) "org.datanucleus.api.jdo" is already registered. Ensure you dont have multiple JAR versions of the same plugin in the classpath. The URL "file:/usr/lib/hive/lib/datanucleus-api-jdo-3.2.6.jar" is already registered, and you are trying to register an identical plugin located at URL "file:/usr/lib/flume-ng/lib/datanucleus-api-jdo-3.2.1.jar."
15/07/21 04:48:13 WARN General: Plugin (Bundle) "org.datanucleus.store.rdbms" is already registered. Ensure you dont have multiple JAR versions of the same plugin in the classpath. The URL "file:/usr/lib/flume-ng/lib/datanucleus-rdbms-3.2.1.jar" is already registered, and you are trying to register an identical plugin located at URL "file:/usr/lib/hive/lib/datanucleus-rdbms-3.2.9.jar."
15/07/21 04:48:13 WARN General: Plugin (Bundle) "org.datanucleus" is already registered. Ensure you dont have multiple JAR versions of the same plugin in the classpath. The URL "file:/usr/lib/hive/lib/datanucleus-core-3.2.10.jar" is already registered, and you are trying to register an identical plugin located at URL "file:/usr/lib/flume-ng/lib/datanucleus-core-3.2.2.jar."
15/07/21 04:48:13 INFO Persistence: Property datanucleus.cache.level2 unknown - will be ignored
15/07/21 04:48:13 INFO Persistence: Property hive.metastore.integral.jdo.pushdown unknown - will be ignored
java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient

Anybody else had this problem? Please help!

Ok, I finally fixed the issue. 2 things needed to be done:

 

1- Import implicits:

      Note that this should be done only after an instance of org.apache.spark.sql.SQLContext is created. It should be written as:

      val sqlContext= new org.apache.spark.sql.SQLContext(sc)
      import sqlContext.implicits._

 

 

2- Move case class outside of the method:

      case class, by use of which you define the schema of the DataFrame, should be defined outside of the method needing it. You can read more about it here:

      https://issues.scala-lang.org/browse/SI-6649

 

Cheers.

New Contributor
Can you show me how you write case class to define schema and how to use it in your method? Thanks so much

New Contributor

Hi,

 

Can you shae your program.

 

I am getting one single error mentioned below:-

 

[info] Compiling 1 Scala source to /home/sumeet/SimpleSparkProject/target/scala-2.11/classes...
[error] /home/sumeet/SimpleSparkProject/src/main/scala/SimpleApp.scala:16: value toDF is not a member of org.apache.spark.rdd.RDD[Auction]
[error] val auction = ebay.toDF()
[error] ^

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
object SimpleApp {
def main(args: Array[String]) {
val sc = new SparkContext("local", "Simple App", "/usr/local/spark-1.4.0-incubating",
List("target/scala-2.10/simple-project_2.10-1.0.jar"))
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val ebayText = sc.textFile("/home/sumeet/Desktop/useful huge sample data/ebay.csv")
ebayText.first()
case class Auction(auctionid: String, bid: Float, bidtime: Float, bidder: String, bidderrate: Integer, openbid: Float, price: Float)
val ebay = ebayText.map(_.split(",")).map(p => Auction(p(0),p(1).toFloat,p(2).toFloat,p(3),p(4).toInt,p(5).toFloat,p(6).toFloat))
ebay.first()
ebay.count()
val auction = ebay.toDF()
auction.show()
}
}

 

 

New Contributor

Hi, Thank You! It resolved the similar issue that I was facing. However, coulc you please share your knowledge on why is this done? And what exactly implicit does in this case. Reply appreciated. Sorry for reopening this post.

New Contributor

package org.example.textclassification

import org.apache.predictionio.controller.P2LAlgorithm
import org.apache.predictionio.controller.Params

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.UserDefinedFunction

import grizzled.slf4j.Logger

case class LRAlgorithmParams(regParam: Double) extends Params

class LRAlgorithm(val ap: LRAlgorithmParams)
extends P2LAlgorithm[PreparedData, LRModel, Query, PredictedResult] {

@transient lazy val logger = Logger[this.type]

def train(sc: SparkContext, pd: PreparedData): LRModel = {

// Import SQLContext for creating DataFrame.
val sql: SQLContext = new SQLContext(sc)
import sql.implicits._

val lr = new LogisticRegression()
.setMaxIter(10)
.setThreshold(0.5)
.setRegParam(ap.regParam)

val labels: Seq[Double] = pd.categoryMap.keys.toSeq

val data = labels.foldLeft(pd.transformedData.toDF)( //transform to Spark DataFrame
// Add the different binary columns for each label.
(data: DataFrame, label: Double) => {
// function: multiclass labels --> binary labels
val f: UserDefinedFunction = functions.udf((e : Double) => if (e == label) 1.0 else 0.0)

data.withColumn(label.toInt.toString, f(data("label")))
}
ubuntu@ip-172-20-9-118:/spark/tracxn/predictionio/classification/isCompany$ cat src/main/scala/LRAlgorithm.scala
package org.example.textclassification

import org.apache.predictionio.controller.P2LAlgorithm
import org.apache.predictionio.controller.Params

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.UserDefinedFunction

import grizzled.slf4j.Logger

case class LRAlgorithmParams(regParam: Double) extends Params

class LRAlgorithm(val ap: LRAlgorithmParams)
extends P2LAlgorithm[PreparedData, LRModel, Query, PredictedResult] {

@transient lazy val logger = Logger[this.type]

def train(sc: SparkContext, pd: PreparedData): LRModel = {

// Import SQLContext for creating DataFrame.
val sql: SQLContext = new SQLContext(sc)
import sql.implicits._

val lr = new LogisticRegression()
.setMaxIter(10)
.setThreshold(0.5)
.setRegParam(ap.regParam)

val labels: Seq[Double] = pd.categoryMap.keys.toSeq

val data = labels.foldLeft(pd.transformedData.toDF)( //transform to Spark DataFrame
// Add the different binary columns for each label.
(data: DataFrame, label: Double) => {
// function: multiclass labels --> binary labels
val f: UserDefinedFunction = functions.udf((e : Double) => if (e == label) 1.0 else 0.0)

data.withColumn(label.toInt.toString, f(data("label")))
}
)

// Create a logistic regression model for each class.
val lrModels : Seq[(Double, LREstimate)] = labels.map(
label => {
val lab = label.toInt.toString

val fit = lr.setLabelCol(lab).fit(
data.select(lab, "features")
)

// Return (label, feature coefficients, and intercept term.
(label, LREstimate(fit.weights.toArray, fit.intercept))

}
)

new LRModel(
tfIdf = pd.tfIdf,
categoryMap = pd.categoryMap,
lrModels = lrModels
)
}

def predict(model: LRModel, query: Query): PredictedResult = {
model.predict(query.text)
}
}

case class LREstimate (
coefficients : Array[Double],
intercept : Double
)

class LRModel(
val tfIdf: TFIDFModel,
val categoryMap: Map[Double, String],
val lrModels: Seq[(Double, LREstimate)]) extends Serializable {

/** Enable vector inner product for prediction. */
private def innerProduct (x : Array[Double], y : Array[Double]) : Double = {
x.zip(y).map(e => e._1 * e._2).sum
}

/** Define prediction rule. */
def predict(text: String): PredictedResult = {
val x: Array[Double] = tfIdf.transform(text).toArray

// Logistic Regression binary formula for positive probability.
// According to MLLib documentation, class labeled 0 is used as pivot.
// Thus, we are using:
// log(p1/p0) = log(p1/(1 - p1)) = b0 + xTb =: z
// p1 = exp(z) * (1 - p1)
// p1 * (1 + exp(z)) = exp(z)
// p1 = exp(z)/(1 + exp(z))
val pred = lrModels.map(
e => {
val z = scala.math.exp(innerProduct(e._2.coefficients, x) + e._2.intercept)
(e._1, z / (1 + z))
}
).maxBy(_._2)

PredictedResult(categoryMap(pred._1), pred._2)
}

override def toString = s"LR model"
}

 

Getting same error in my code . Can you help me how to fix it 

New Contributor

Import implicit

 

 where sc=

val sc = SparkSession
.builder()
.appName("demo")
.master("local")
.getOrCreate()

import sc.implicits._