Created 01-20-2016 02:20 PM
I am completely blank on this error. its been 2 days i am struggling with this error. my kafka producer is working fine emitting ClickEvent every second. confirmed that events are being posed to kafka topic in Avro format.
Now i want to read back in spark streaming. code is at end. notice i got this error "java.lang.NoClassDefFoundError: org/apache/spark/streaming/util/WriteAheadLogUtils$" now i tried to find every where WriteAheadLogUtils and found nothing. Please help me resolve this issues
16/01/20 14:17:37 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@sandbox.hortonworks.com:38373/user/HeartbeatReceiver 16/01/20 14:17:37 INFO NettyBlockTransferService: Server created on 60697 16/01/20 14:17:37 INFO BlockManagerMaster: Trying to register BlockManager 16/01/20 14:17:37 INFO BlockManagerMasterActor: Registering block manager localhost:60697 with 265.4 MB RAM, BlockManagerId(<driver>, localhost, 60697) 16/01/20 14:17:37 INFO BlockManagerMaster: Registered BlockManager 16/01/20 14:17:39 WARN DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. Error: application failed with exception java.lang.NoClassDefFoundError: org/apache/spark/streaming/util/WriteAheadLogUtils$ at org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:84) at streamingAvroConsumer$delayedInit$body.apply(streamingAvroConsumer.scala:44) at scala.Function0$class.apply$mcV$sp(Function0.scala:40) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32) at scala.App$class.main(App.scala:71) at streamingAvroConsumer$.main(streamingAvroConsumer.scala:18) at streamingAvroConsumer.main(streamingAvroConsumer.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:367) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:77) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: org.apache.spark.streaming.util.WriteAheadLogUtils$ at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 18 more
Code is below
import events.avro.ClickEvent import kafka.serializer.DefaultDecoder import org.apache.avro.io.DecoderFactory import org.apache.avro.specific.SpecificDatumReader import org.apache.spark._ import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext} import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.kafka._ import org.apache.spark.SparkConf object streamingAvroConsumer extends App { println("Initializing App") val numThreads = "1" val topics = "testing2" val sparkConf = new SparkConf().setAppName("WindowClickCount").setMaster("local[2]") // Slide duration of ReduceWindowedDStream must be multiple of the parent DStream, and we chose 2 seconds for the reduced // window stream val ssc = new StreamingContext(sparkConf, Seconds(2)) // Because we're using .reduceByKeyAndWindow, we need to persist it to disk ssc.checkpoint("./checkpointDir") val kafkaConf = Map( "metadata.broker.list" -> "sandbox.hortonworks.com:6667", "zookeeper.connect" -> "sandbox.hortonworks.com:2181", "group.id" -> "kafka-spark-streaming-example", "zookeeper.connection.timeout.ms" -> "1000") val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap // Create a new stream which can decode byte arrays. For this exercise, the incoming stream only contain user and product Ids val lines = KafkaUtils.createStream[String, Array[Byte], DefaultDecoder, DefaultDecoder](ssc, kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER).map(_._2) val mappedUserName = lines.transform { rdd => rdd.map { bytes => AvroUtil.clickEventDecode(bytes) }.map { clickEvent => println(clickEvent.time); } } ssc.start() ssc.awaitTermination() } object AvroUtil { val reader = new SpecificDatumReader[ClickEvent](ClickEvent.getClassSchema) def clickEventDecode(bytes: Array[Byte]): ClickEvent = { val decoder = DecoderFactory.get.binaryDecoder(bytes, null) reader.read(null, decoder) } }
Created 01-20-2016 02:30 PM
you might be missing dependencies, the class is in Spark project. Try to include the dependency in your pom, compare versions match between project and cluster, check your classpath. Also it says it cannot load libhadoop. Run service checks for each service.
Created 01-20-2016 02:56 PM
i am compiling this scala project with SBT from command prompt here is the SBT. javacOptions ++= Seq("-source", "1.7", "-target", "1.7", "-Xlint") lazy val root = (project in file(".")). settings( name := "helo", version := "1", scalaVersion := "2.10.4" ) libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.2.0" libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.2.0" libraryDependencies += "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.4.1" libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.2.0" libraryDependencies += "org.scala-lang" % "scala-library" % "2.10.4" % "test" libraryDependencies += "org.codehaus.jackson" % "jackson-mapper-asl" % "1.9.13" // META-INF discarding mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) => { case PathList("META-INF", xs @ _*) => MergeStrategy.discard case x => MergeStrategy.first } }
Created 01-20-2016 02:59 PM
@Shahzad Aslam I see you're using Spark 1.2, can you try with 1.4.1 or whatever is on your cluster?
Created 01-20-2016 03:13 PM
using HDP 2.2.4.2-2 and spark version installed on this HDP is 1.2.1
please suggest changes in build file. also i am always confused which version to use with what what are the general guidelines using these. I am MS guy and java compatibility and dependencies always bite me hard. in SBT i got a solution that i compile a fat jar with sbt assembly and then cp it :) this time this trick is also not working :(
Created 01-20-2016 03:18 PM
best practice is to use Hortonworks provided repositories and not Apache. I have to double check on Spark Streaming availability in 1.2.1 but looking at your pom, if your Spark is 1.2 then make sure everything else is 1.2, don't mix and match 1.4.1 with 1.2. Use the latest stable so for 1.2 branch, it is 1.2.1
Created 01-20-2016 03:25 PM
@Shahzad Aslam just checked our documentation, latest Spark available with HDP 2.2.9 is Spark 1.3.1, there are a lot of improvements going from 1.2.1 to 1.3.1, perhaps you can do a rolling upgrade to 2.2.9 from 2.2.4 to get enhancements and fixes. Otherwise, use version 1.2.1 on all dependencies in your pom. Spark Streaming is included in 1.2.1.
Created 01-20-2016 03:27 PM
@Shahzad Aslam actually Spark streaming is technical preview and not guaranteed to work 100% in HDP 2.2. Take a look at this doc. The only supported Spark Streaming release is Spark 1.5.2 available with HDP 2.3.4. You will need to upgrade your cluster.
Created 01-20-2016 03:45 PM
huu... ok that means i need to upgrade my HDP or convert all dependencies to matching versions
Created 01-20-2016 03:52 PM
yes match the versions will not guarantee it will work because Spark Streaming is in Technical Preview below 1.5.2 but it's less involved than doing a cluster upgrade. Try it out first and let us know. Otherwise, communicate to your team that you need to upgrade. You can also look at Storm, as it is a stable production in HDP 2.2 and can give Spark Streaming run for its money as far as processing per event rather than micro batching in Spark.