Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Spark Streaming job fails after new partitions are assigned(old are revoked)for kafka topic : No current assignment for partition topic1

avatar
Contributor

Using spark streaming with kafka and creating a direct stream using below code-

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> conf.getString("kafka.brokers"),
  "zookeeper.connect" -> conf.getString("kafka.zookeeper"),
  "group.id" -> conf.getString("kafka.consumergroups"),
  "auto.offset.reset" -> args { 1 },
  "enable.auto.commit" -> (conf.getString("kafka.autoCommit").toBoolean: java.lang.Boolean),
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "security.protocol" -> "SASL_PLAINTEXT",
  "session.timeout.ms" -> args { 2 },
  "max.poll.records" -> args { 3 },
  "request.timeout.ms" -> args { 4 },
  "fetch.max.wait.ms" -> args { 5 })

val messages = KafkaUtils.createDirectStream[String, String](
  ssc,
  LocationStrategies.PreferConsistent,
  ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))

After some processing we commit the offset using commitAsync API.

try
{
messages.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  messages.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
}
catch
{   
 case e:Throwable => e.printStackTrace()
}

Below error causes the job to crash-

            18/03/20 10:43:30 INFO ConsumerCoordinator: Revoking previously assigned partitions [TOPIC_NAME-3, TOPIC_NAME-5, TOPIC_NAME-4] for group 21_feb_reload_2
            18/03/20 10:43:30 INFO AbstractCoordinator: (Re-)joining group 21_feb_reload_2
            18/03/20 10:43:30 INFO AbstractCoordinator: (Re-)joining group 21_feb_reload_2
            18/03/20 10:44:00 INFO AbstractCoordinator: Successfully joined group 21_feb_reload_2 with generation 20714
            18/03/20 10:44:00 INFO ConsumerCoordinator: Setting newly assigned partitions [TOPIC_NAME-1, TOPIC_NAME-0, TOPIC_NAME-2] for group 21_feb_reload_2
            18/03/20 10:44:00 ERROR JobScheduler: Error generating jobs for time 1521557010000 ms
            java.lang.IllegalStateException: No current assignment for partition TOPIC_NAME-4
                at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251)
                at org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetReset(SubscriptionState.java:315)
                at org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.java:1170)
                at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:197)
                at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:214)
                at org.apache.spark.streaming.dstream.DStream$anonfun$getOrCompute$1$anonfun$1$anonfun$apply$7.apply(DStream.scala:341)
                at org.apache.spark.streaming.dstream.DStream$anonfun$getOrCompute$1$anonfun$1$anonfun$apply$7.apply(DStream.scala:341)
                at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
                at org.apache.spark.streaming.dstream.DStream$anonfun$getOrCompute$1$anonfun$1.apply(DStream.scala:340)
                at org.apache.spark.streaming.dstream.DStream$anonfun$getOrCompute$1$anonfun$1.apply(DStream.scala:340)
                at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
                at org.apache.spark.streaming.dstream.DStream$anonfun$getOrCompute$1.apply(DStream.scala:335)
                at org.apache.spark.streaming.dstream.DStream$anonfun$getOrCompute$1.apply(DStream.scala:333)
                at scala.Option.orElse(Option.scala:289)
                at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
                at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:36)
                at org.apache.spark.streaming.dstream.DStream$anonfun$getOrCompute$1$anonfun$1$anonfun$apply$7.apply(DStream.scala:341)
                at org.apache.spark.streaming.dstream.DStream$anonfun$getOrCompute$1$anonfun$1$anonfun$apply$7.apply(DStream.scala:341)
                at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
                at org.apache.spark.streaming.dstream.DStream$anonfun$getOrCompute$1$anonfun$1.apply(DStream.scala:340)
                at org.apache.spark.streaming.dstream.DStream$anonfun$getOrCompute$1$anonfun$1.apply(DStream.scala:340)
                at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
                at org.apache.spark.streaming.dstream.DStream$anonfun$getOrCompute$1.apply(DStream.scala:335)
                at org.apache.spark.streaming.dstream.DStream$anonfun$getOrCompute$1.apply(DStream.scala:333)
                at scala.Option.orElse(Option.scala:289)
                at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
                at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
                at org.apache.spark.streaming.DStreamGraph$anonfun$1.apply(DStreamGraph.scala:117)
                at org.apache.spark.streaming.DStreamGraph$anonfun$1.apply(DStreamGraph.scala:116)
                at scala.collection.TraversableLike$anonfun$flatMap$1.apply(TraversableLike.scala:241)
                at scala.collection.TraversableLike$anonfun$flatMap$1.apply(TraversableLike.scala:241)
                at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
                at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
                at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
                at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
                at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
                at org.apache.spark.streaming.scheduler.JobGenerator$anonfun$3.apply(JobGenerator.scala:249)
                at org.apache.spark.streaming.scheduler.JobGenerator$anonfun$3.apply(JobGenerator.scala:247)
                at scala.util.Try$.apply(Try.scala:192)
                at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
                at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$processEvent(JobGenerator.scala:183)
                at org.apache.spark.streaming.scheduler.JobGenerator$anon$1.onReceive(JobGenerator.scala:89)
                at org.apache.spark.streaming.scheduler.JobGenerator$anon$1.onReceive(JobGenerator.scala:88)
                at org.apache.spark.util.EventLoop$anon$1.run(EventLoop.scala:48)
            18/03/20 10:44:00 ERROR ApplicationMaster: User class threw exception: java.lang.IllegalStateException: No current assignment for partition 

My findings -

1- Similar issue from the post -Kafka Spark Stream throws Exception:No current assignment for partition This does not give much explanation to why use Assign rather than Subscribe.

2- Tying to make sure there is no re-balancing, I increased the session.timeout.ms to almost my batch duration as my processing gets completed in less than 2 min(batch duration).

session.timeout.ms- The amount of time a consumer can be out of contact with the brokers while still considered alive (https://www.safaribooksonline.com/library/view/kafka-the-definitive/9781491936153/ch04.html)

3- Came across Re-balance Listeners with methods - a onPartitionsRevoked b onPartitionsAssigned

But was unable to understand how can I use the first one which commits offset just before rebalancing.

Any Inputs will be much appreciated

1 REPLY 1

avatar
Contributor