Support Questions
Find answers, ask questions, and share your expertise

Spark streaming generating error when reading kafka topic

Highlighted

Spark streaming generating error when reading kafka topic

Explorer

I am completely blank on this error. its been 2 days i am struggling with this error. my kafka producer is working fine emitting ClickEvent every second. confirmed that events are being posed to kafka topic in Avro format.

Now i want to read back in spark streaming. code is at end. notice i got this error "java.lang.NoClassDefFoundError: org/apache/spark/streaming/util/WriteAheadLogUtils$" now i tried to find every where WriteAheadLogUtils and found nothing. Please help me resolve this issues

16/01/20 14:17:37 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@sandbox.hortonworks.com:38373/user/HeartbeatReceiver
16/01/20 14:17:37 INFO NettyBlockTransferService: Server created on 60697
16/01/20 14:17:37 INFO BlockManagerMaster: Trying to register BlockManager
16/01/20 14:17:37 INFO BlockManagerMasterActor: Registering block manager localhost:60697 with 265.4 MB RAM, BlockManagerId(<driver>, localhost, 60697)
16/01/20 14:17:37 INFO BlockManagerMaster: Registered BlockManager
16/01/20 14:17:39 WARN DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
Error: application failed with exception
java.lang.NoClassDefFoundError: org/apache/spark/streaming/util/WriteAheadLogUtils$
        at org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:84)
        at streamingAvroConsumer$delayedInit$body.apply(streamingAvroConsumer.scala:44)
        at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
        at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
        at scala.App$$anonfun$main$1.apply(App.scala:71)
        at scala.App$$anonfun$main$1.apply(App.scala:71)
        at scala.collection.immutable.List.foreach(List.scala:318)
        at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
        at scala.App$class.main(App.scala:71)
        at streamingAvroConsumer$.main(streamingAvroConsumer.scala:18)
        at streamingAvroConsumer.main(streamingAvroConsumer.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:367)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:77)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.streaming.util.WriteAheadLogUtils$
        at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
        ... 18 more



Code is below

import events.avro.ClickEvent
import kafka.serializer.DefaultDecoder
import org.apache.avro.io.DecoderFactory
import org.apache.avro.specific.SpecificDatumReader
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext}


import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf






object streamingAvroConsumer extends App 
{
  println("Initializing App")




  	val numThreads = "1" 
	val topics = "testing2"


  val sparkConf = new SparkConf().setAppName("WindowClickCount").setMaster("local[2]")


  // Slide duration of ReduceWindowedDStream must be multiple of the parent DStream, and we chose 2 seconds for the reduced
  // window stream
  val ssc = new StreamingContext(sparkConf, Seconds(2))


  // Because we're using .reduceByKeyAndWindow, we need to persist it to disk
  ssc.checkpoint("./checkpointDir")


  val kafkaConf = Map(
    "metadata.broker.list" -> "sandbox.hortonworks.com:6667", 
    "zookeeper.connect" -> "sandbox.hortonworks.com:2181", 
    "group.id" -> "kafka-spark-streaming-example",
    "zookeeper.connection.timeout.ms" -> "1000")


  val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap


  // Create a new stream which can decode byte arrays.  For this exercise, the incoming stream only contain user and product Ids
  val lines = KafkaUtils.createStream[String, Array[Byte], DefaultDecoder, DefaultDecoder](ssc, kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER).map(_._2)




	val mappedUserName = lines.transform
	{   rdd =>
			rdd.map { bytes => AvroUtil.clickEventDecode(bytes) }.map 
			{ clickEvent =>
				println(clickEvent.time);
			}
	}




  ssc.start()
  ssc.awaitTermination()
}






	object AvroUtil 
	{
		val reader = new SpecificDatumReader[ClickEvent](ClickEvent.getClassSchema)
		def clickEventDecode(bytes: Array[Byte]): ClickEvent = {
			val decoder = DecoderFactory.get.binaryDecoder(bytes, null)
			reader.read(null, decoder)
	    }
	}


10 REPLIES 10
Highlighted

Re: Spark streaming generating error when reading kafka topic

Mentor
@Shahzad Aslam

you might be missing dependencies, the class is in Spark project. Try to include the dependency in your pom, compare versions match between project and cluster, check your classpath. Also it says it cannot load libhadoop. Run service checks for each service.

Re: Spark streaming generating error when reading kafka topic

Explorer
i am compiling this scala project with SBT from command prompt here is the SBT.  

   


javacOptions ++= Seq("-source", "1.7", "-target", "1.7", "-Xlint")
lazy val root = (project in file(".")).
  settings(
    name := "helo",
    version := "1",
	scalaVersion := "2.10.4"
  )
libraryDependencies +=    	  "org.apache.spark" % "spark-core_2.10" % "1.2.0"
libraryDependencies +=    	  "org.apache.spark" % "spark-streaming_2.10" % "1.2.0"
libraryDependencies +=    	  "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.4.1"
libraryDependencies +=    	  "org.apache.spark" % "spark-sql_2.10" % "1.2.0"
libraryDependencies +=        "org.scala-lang" % "scala-library" % "2.10.4" % "test"
libraryDependencies +=        "org.codehaus.jackson" % "jackson-mapper-asl" % "1.9.13"
	// META-INF discarding
	mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
	   {
		case PathList("META-INF", xs @ _*) => MergeStrategy.discard
		case x => MergeStrategy.first
	   }
	}



Highlighted

Re: Spark streaming generating error when reading kafka topic

Mentor

@Shahzad Aslam I see you're using Spark 1.2, can you try with 1.4.1 or whatever is on your cluster?

Highlighted

Re: Spark streaming generating error when reading kafka topic

Explorer

using HDP 2.2.4.2-2 and spark version installed on this HDP is 1.2.1

please suggest changes in build file. also i am always confused which version to use with what what are the general guidelines using these. I am MS guy and java compatibility and dependencies always bite me hard. in SBT i got a solution that i compile a fat jar with sbt assembly and then cp it :) this time this trick is also not working :(

Highlighted

Re: Spark streaming generating error when reading kafka topic

Mentor
@Shahzad Aslam

best practice is to use Hortonworks provided repositories and not Apache. I have to double check on Spark Streaming availability in 1.2.1 but looking at your pom, if your Spark is 1.2 then make sure everything else is 1.2, don't mix and match 1.4.1 with 1.2. Use the latest stable so for 1.2 branch, it is 1.2.1

Highlighted

Re: Spark streaming generating error when reading kafka topic

Mentor

@Shahzad Aslam just checked our documentation, latest Spark available with HDP 2.2.9 is Spark 1.3.1, there are a lot of improvements going from 1.2.1 to 1.3.1, perhaps you can do a rolling upgrade to 2.2.9 from 2.2.4 to get enhancements and fixes. Otherwise, use version 1.2.1 on all dependencies in your pom. Spark Streaming is included in 1.2.1.

Highlighted

Re: Spark streaming generating error when reading kafka topic

Mentor

@Shahzad Aslam actually Spark streaming is technical preview and not guaranteed to work 100% in HDP 2.2. Take a look at this doc. The only supported Spark Streaming release is Spark 1.5.2 available with HDP 2.3.4. You will need to upgrade your cluster.

Highlighted

Re: Spark streaming generating error when reading kafka topic

Explorer

huu... ok that means i need to upgrade my HDP or convert all dependencies to matching versions

Highlighted

Re: Spark streaming generating error when reading kafka topic

Mentor

yes match the versions will not guarantee it will work because Spark Streaming is in Technical Preview below 1.5.2 but it's less involved than doing a cluster upgrade. Try it out first and let us know. Otherwise, communicate to your team that you need to upgrade. You can also look at Storm, as it is a stable production in HDP 2.2 and can give Spark Streaming run for its money as far as processing per event rather than micro batching in Spark.