Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Kafka-Spark Streaming application error -Couldn't find leader offsets for Set

Highlighted

Kafka-Spark Streaming application error -Couldn't find leader offsets for Set

Contributor

Hi,

I am trying to run the attached program in scala ide which grabs the messages from the Kafka topic but get the following spark exception. I am not sure how to proceed. Any help will be greatly appreciated. I am able to retrieve the messages from the command line but the program doesnt seem to run.

16/11/22 11:20:56 INFO SimpleConsumer: Reconnect due to socket error: java.nio.channels.ClosedChannelException Exception in thread "main" org.apache.spark.SparkException: java.nio.channels.ClosedChannelException org.apache.spark.SparkException: Couldn't find leader offsets for Set([REST,0]) at org.apache.spark.streaming.kafka.KafkaCluster$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) at org.apache.spark.streaming.kafka.KafkaCluster$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) at scala.util.Either.fold(Either.scala:97) at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365) at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222) at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484) at ca.SparkCassandra.Kafka$.main(Kafka.scala:32) at ca.SparkCassandra.Kafka.main(Kafka.scala) 16/11/22 11:20:57 INFO SparkContext: Invoking stop() from shutdown hook 16/11/22 11:20:57 INFO SparkUI: Stopped Spark web UI at http://10.63.48.172:4040 16/11/22 11:20:57 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!

package ca.SparkCassandra

import kafka.serializer.StringDecoder
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.log4j._
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.kafka.KafkaUtils




object Kafka {    
    
  def main(args: Array[String]) { 
       
    val ssc = new StreamingContext("local[*]","Kafka", Seconds(5))
     
     val WINDOW_LENGTH = new Duration( 1000)
  val SLIDE_INTERVAL = new Duration(900)
     val kafkaParams = Map("metadata.broker.list" -> "192.168.30.130:6667", "auto.offset.reset" -> "smallest" )
      
  val topics = List("REST").toSet
//val topicpMap = topics.split(",").map((_, numThreads.toInt)).toMap 
// Create the direct stream with the Kafka parameters and topics
  val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics).map(_._2)
   kafkaStream.print() 
ssc.checkpoint("c:/checkpoint/")
    ssc.start() 
    ssc.awaitTermination() 
  } 
}
8 REPLIES 8
Highlighted

Re: Kafka-Spark Streaming application error -Couldn't find leader offsets for Set

Expert Contributor

Check out this topic on stackoverflow. I think the first answer is what you need to try.

http://stackoverflow.com/questions/34288449/spark-streaming-kafka-sparkexception-couldnt-find-leader...

Highlighted

Re: Kafka-Spark Streaming application error -Couldn't find leader offsets for Set

Contributor

Thanks. I tried this and got the following

[root@sandbox bin]# ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "192.168.30.130:6667" --topic "REST" --time -1 {metadata.broker.list=192.168.30.130:6667, request.timeout.ms=1000, client.id=GetOffsetShell, security.protocol=PLAINTEXT}

REST:0:446

How do i alter my code to consume from there...sorry I am newbie in this...any guidance will be really helpful

Thanks

Re: Kafka-Spark Streaming application error -Couldn't find leader offsets for Set

Expert Contributor

When you create the kafka receiver, add this kafka param to the receiver

auto.offset.reset -> smallest

By default that config is set to 0, so it tries to read 0.

Highlighted

Re: Kafka-Spark Streaming application error -Couldn't find leader offsets for Set

Contributor

Thanks But I already had that in my code...

Highlighted

Re: Kafka-Spark Streaming application error -Couldn't find leader offsets for Set

Expert Contributor

Is your kafka cluster on the same cluster as your spark streaming application? There could be some firewall stuff going on. Are you running in yarn-cluster or yarn-client?

Highlighted

Re: Kafka-Spark Streaming application error -Couldn't find leader offsets for Set

Contributor

I am running the kafka in the Sandbox VM and using the spark install on my windows machine.

Highlighted

Re: Kafka-Spark Streaming application error -Couldn't find leader offsets for Set

Expert Contributor

can you confirm that there are messages in the queue?

Highlighted

Re: Kafka-Spark Streaming application error -Couldn't find leader offsets for Set

Contributor

Yes I was able to view the messages and the topic from the console/command line in the server

Don't have an account?
Coming from Hortonworks? Activate your account here