Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Spark-submit to deploy the jar to Spark - java.lang.ClassNotFoundException

avatar
Explorer

Hi, I'm doing the following tutoria.

https://es.hortonworks.com/tutorial/deploying-machine-learning-models-using-spark-structured-streami...

I'm using the HDP version is HDP-2.5.0.0-1245, the spark version is 1.6.2 and the scala version is 2.10.5.

I have reached this point of the tutorial: Then use spark-submit to deploy the jar to Spark.

I am trying to submit a job which is in target/main/scala, which is a jar file, with the following lines:

/usr/hdp/current/spark2-client/bin/spark-submit --class "main.scala.Collect" --master local[4] ./SentimentAnalysis-assembly-2.0.0.jar

All goes well, except the following errors:

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Exception in thread "main" java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at http://spark-packages.org
        at org.apache.spark.sql.execution.datasources.DataSource.lookupDataSource(DataSource.scala:145)
        at org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:78)
        at org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:78)
        at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:195)
        at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:79)
        at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:79)
        at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)
        at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:142)
        at main.scala.Collect$.main(Collect.scala:61)
        at main.scala.Collect.main(Collect.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:729)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: kafka.DefaultSource
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5$$anonfun$apply$1.apply(DataSource.scala:130)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5$$anonfun$apply$1.apply(DataSource.scala:130)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5.apply(DataSource.scala:130)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5.apply(DataSource.scala:130)
        at scala.util.Try.orElse(Try.scala:84)
        at org.apache.spark.sql.execution.datasources.DataSource.lookupDataSource(DataSource.scala:130)
        ... 18 more

My build.sbt file is:

name := "SentimentAnalysis"

version := "2.0.0"

scalaVersion := "2.10.5"//"2.10.4"//

libraryDependencies ++= {
  val sparkVer = "2.1.0"//"1.6.1"//
  Seq(
    "org.apache.spark"     %% "spark-core"              % sparkVer % "provided" withSources(),
    "org.apache.spark"     %% "spark-mllib"             % sparkVer % "provided" withSources(),
    "org.apache.spark"     %% "spark-sql"               % sparkVer withSources(),
    "org.apache.spark"     %% "spark-streaming"         % sparkVer % "provided" withSources(),
    "org.apache.spark"     %% "spark-streaming-kafka-0-10" % sparkVer withSources(),
    "org.apache.spark"     %% "spark-sql-kafka-0-10" % sparkVer withSources(),
    "org.apache.kafka"     %% "kafka" % "0.10.0" withSources(),
    "com.typesafe" % "config" % "1.3.1",
    "com.google.code.gson" % "gson" % "2.8.0"
  )
}


assemblyMergeStrategy in assembly := {
  case PathList("org", "apache", xs @ _*)      => MergeStrategy.first
  case PathList("META-INF", xs @ _*) => MergeStrategy.discard
  case x => MergeStrategy.first
  case PathList("javax", "xml", xs @ _*)      => MergeStrategy.first
  case PathList("com", "esotericsoftware", xs @ _*)      => MergeStrategy.first
  case PathList("com", "google", xs @ _*)      => MergeStrategy.first
  case x =>
    val oldStrategy = (assemblyMergeStrategy in assembly).value
    oldStrategy(x)
}
1 REPLY 1

avatar

@David Sandoval The code from the link you shared is using Spark 2 and is streaming from Kafka using Structured Streaming. Structured Streaming is TP since HDP 2.6.3. You are running HDP 2.5 with spark 1.6, you should try following link steps using 2.6.3 onwards with spark 2.

HTH

*** If you found this answer addressed your question, please take a moment to login and click the "accept" link on the answer.