Created 03-23-2018 08:00 PM
Using spark streaming with kafka and creating a direct stream using below code-
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> conf.getString("kafka.brokers"),
"zookeeper.connect" -> conf.getString("kafka.zookeeper"),
"group.id" -> conf.getString("kafka.consumergroups"),
"auto.offset.reset" -> args { 1 },
"enable.auto.commit" -> (conf.getString("kafka.autoCommit").toBoolean: java.lang.Boolean),
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"security.protocol" -> "SASL_PLAINTEXT",
"session.timeout.ms" -> args { 2 },
"max.poll.records" -> args { 3 },
"request.timeout.ms" -> args { 4 },
"fetch.max.wait.ms" -> args { 5 })
val messages = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))
After some processing we commit the offset using commitAsync API.
try
{
messages.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
messages.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
}
catch
{
case e:Throwable => e.printStackTrace()
}
Below error causes the job to crash-
18/03/20 10:43:30 INFO ConsumerCoordinator: Revoking previously assigned partitions [TOPIC_NAME-3, TOPIC_NAME-5, TOPIC_NAME-4] for group 21_feb_reload_2
18/03/20 10:43:30 INFO AbstractCoordinator: (Re-)joining group 21_feb_reload_2
18/03/20 10:43:30 INFO AbstractCoordinator: (Re-)joining group 21_feb_reload_2
18/03/20 10:44:00 INFO AbstractCoordinator: Successfully joined group 21_feb_reload_2 with generation 20714
18/03/20 10:44:00 INFO ConsumerCoordinator: Setting newly assigned partitions [TOPIC_NAME-1, TOPIC_NAME-0, TOPIC_NAME-2] for group 21_feb_reload_2
18/03/20 10:44:00 ERROR JobScheduler: Error generating jobs for time 1521557010000 ms
java.lang.IllegalStateException: No current assignment for partition TOPIC_NAME-4
at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251)
at org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetReset(SubscriptionState.java:315)
at org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.java:1170)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:197)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:214)
at org.apache.spark.streaming.dstream.DStream$anonfun$getOrCompute$1$anonfun$1$anonfun$apply$7.apply(DStream.scala:341)
at org.apache.spark.streaming.dstream.DStream$anonfun$getOrCompute$1$anonfun$1$anonfun$apply$7.apply(DStream.scala:341)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.dstream.DStream$anonfun$getOrCompute$1$anonfun$1.apply(DStream.scala:340)
at org.apache.spark.streaming.dstream.DStream$anonfun$getOrCompute$1$anonfun$1.apply(DStream.scala:340)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
at org.apache.spark.streaming.dstream.DStream$anonfun$getOrCompute$1.apply(DStream.scala:335)
at org.apache.spark.streaming.dstream.DStream$anonfun$getOrCompute$1.apply(DStream.scala:333)
at scala.Option.orElse(Option.scala:289)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:36)
at org.apache.spark.streaming.dstream.DStream$anonfun$getOrCompute$1$anonfun$1$anonfun$apply$7.apply(DStream.scala:341)
at org.apache.spark.streaming.dstream.DStream$anonfun$getOrCompute$1$anonfun$1$anonfun$apply$7.apply(DStream.scala:341)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.dstream.DStream$anonfun$getOrCompute$1$anonfun$1.apply(DStream.scala:340)
at org.apache.spark.streaming.dstream.DStream$anonfun$getOrCompute$1$anonfun$1.apply(DStream.scala:340)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
at org.apache.spark.streaming.dstream.DStream$anonfun$getOrCompute$1.apply(DStream.scala:335)
at org.apache.spark.streaming.dstream.DStream$anonfun$getOrCompute$1.apply(DStream.scala:333)
at scala.Option.orElse(Option.scala:289)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
at org.apache.spark.streaming.DStreamGraph$anonfun$1.apply(DStreamGraph.scala:117)
at org.apache.spark.streaming.DStreamGraph$anonfun$1.apply(DStreamGraph.scala:116)
at scala.collection.TraversableLike$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
at org.apache.spark.streaming.scheduler.JobGenerator$anonfun$3.apply(JobGenerator.scala:249)
at org.apache.spark.streaming.scheduler.JobGenerator$anonfun$3.apply(JobGenerator.scala:247)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$processEvent(JobGenerator.scala:183)
at org.apache.spark.streaming.scheduler.JobGenerator$anon$1.onReceive(JobGenerator.scala:89)
at org.apache.spark.streaming.scheduler.JobGenerator$anon$1.onReceive(JobGenerator.scala:88)
at org.apache.spark.util.EventLoop$anon$1.run(EventLoop.scala:48)
18/03/20 10:44:00 ERROR ApplicationMaster: User class threw exception: java.lang.IllegalStateException: No current assignment for partition
My findings -
1- Similar issue from the post -Kafka Spark Stream throws Exception:No current assignment for partition This does not give much explanation to why use Assign rather than Subscribe.
2- Tying to make sure there is no re-balancing, I increased the session.timeout.ms to almost my batch duration as my processing gets completed in less than 2 min(batch duration).
session.timeout.ms- The amount of time a consumer can be out of contact with the brokers while still considered alive (https://www.safaribooksonline.com/library/view/kafka-the-definitive/9781491936153/ch04.html)
3- Came across Re-balance Listeners with methods - a onPartitionsRevoked b onPartitionsAssigned
But was unable to understand how can I use the first one which commits offset just before rebalancing.
Any Inputs will be much appreciated
Created 03-28-2018 02:08 PM