Created 05-22-2018 02:38 PM
Hi, I'm doing the following tutoria.
https://es.hortonworks.com/tutorial/deploying-machine-learning-models-using-spark-structured-streami...
I'm using the HDP version is HDP-2.5.0.0-1245, the spark version is 1.6.2 and the scala version is 2.10.5.
I have reached this point of the tutorial: Then use spark-submit to deploy the jar to Spark.
I am trying to submit a job which is in target/main/scala, which is a jar file, with the following lines:
/usr/hdp/current/spark2-client/bin/spark-submit --class "main.scala.Collect" --master local[4] ./SentimentAnalysis-assembly-2.0.0.jar
All goes well, except the following errors:
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. Exception in thread "main" java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at http://spark-packages.org at org.apache.spark.sql.execution.datasources.DataSource.lookupDataSource(DataSource.scala:145) at org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:78) at org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:78) at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:195) at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:79) at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:79) at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30) at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:142) at main.scala.Collect$.main(Collect.scala:61) at main.scala.Collect.main(Collect.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:729) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: kafka.DefaultSource at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5$$anonfun$apply$1.apply(DataSource.scala:130) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5$$anonfun$apply$1.apply(DataSource.scala:130) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5.apply(DataSource.scala:130) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5.apply(DataSource.scala:130) at scala.util.Try.orElse(Try.scala:84) at org.apache.spark.sql.execution.datasources.DataSource.lookupDataSource(DataSource.scala:130) ... 18 more
My build.sbt file is:
name := "SentimentAnalysis" version := "2.0.0" scalaVersion := "2.10.5"//"2.10.4"// libraryDependencies ++= { val sparkVer = "2.1.0"//"1.6.1"// Seq( "org.apache.spark" %% "spark-core" % sparkVer % "provided" withSources(), "org.apache.spark" %% "spark-mllib" % sparkVer % "provided" withSources(), "org.apache.spark" %% "spark-sql" % sparkVer withSources(), "org.apache.spark" %% "spark-streaming" % sparkVer % "provided" withSources(), "org.apache.spark" %% "spark-streaming-kafka-0-10" % sparkVer withSources(), "org.apache.spark" %% "spark-sql-kafka-0-10" % sparkVer withSources(), "org.apache.kafka" %% "kafka" % "0.10.0" withSources(), "com.typesafe" % "config" % "1.3.1", "com.google.code.gson" % "gson" % "2.8.0" ) } assemblyMergeStrategy in assembly := { case PathList("org", "apache", xs @ _*) => MergeStrategy.first case PathList("META-INF", xs @ _*) => MergeStrategy.discard case x => MergeStrategy.first case PathList("javax", "xml", xs @ _*) => MergeStrategy.first case PathList("com", "esotericsoftware", xs @ _*) => MergeStrategy.first case PathList("com", "google", xs @ _*) => MergeStrategy.first case x => val oldStrategy = (assemblyMergeStrategy in assembly).value oldStrategy(x) }
Created 05-22-2018 07:43 PM
@David Sandoval The code from the link you shared is using Spark 2 and is streaming from Kafka using Structured Streaming. Structured Streaming is TP since HDP 2.6.3. You are running HDP 2.5 with spark 1.6, you should try following link steps using 2.6.3 onwards with spark 2.
HTH
*** If you found this answer addressed your question, please take a moment to login and click the "accept" link on the answer.