Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

kafka + kafka shutdown because InvalidOffsetException

Highlighted

kafka + kafka shutdown because InvalidOffsetException

HI ALL,

from kafka server.log , we see many messages about "FATAL Fatal error during KafkaServer startup. Prepare to shutdown"

and "kafka.common.InvalidOffsetException: Attempt to append an offset (232884366) to position 203880 no larger than the last offset appended"

dose this messages can explain the reason that we cant start the kafka broker on kafka machine?

second - what is the best solution for this isshue ?

FATAL Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
kafka.common.InvalidOffsetException: Attempt to append an offset (232884366) to position 203880 no larger than the last offset appended (232884366) to /var/kafka/kafka-logs/wctpi.avro.pri.processed-59/00000000000124356738.index.
at kafka.log.OffsetIndex$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:132)
at kafka.log.OffsetIndex$anonfun$append$1.apply(OffsetIndex.scala:122)
at kafka.log.OffsetIndex$anonfun$append$1.apply(OffsetIndex.scala:122)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:233)
at kafka.log.OffsetIndex.append(OffsetIndex.scala:122)
at kafka.log.LogSegment.recover(LogSegment.scala:225)
at kafka.log.Log$anonfun$loadSegments$4.apply(Log.scala:218)
at kafka.log.Log$anonfun$loadSegments$4.apply(Log.scala:179)
at scala.collection.TraversableLike$WithFilter$anonfun$foreach$1.apply(TraversableLike.scala:772)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at kafka.log.Log.loadSegments(Log.scala:179)
at kafka.log.Log.<init>(Log.scala:108)
at kafka.log.LogManager$anonfun$loadLogs$2$anonfun$3$anonfun$apply$10$anonfun$apply$1.apply$mcV$sp(LogManager.scala:151)
at kafka.utils.CoreUtils$anon$1.run(CoreUtils.scala:58)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
FATAL Fatal error during KafkaServerStartable startup. Prepare to shutdown (kafka.server.KafkaServerStartable)
kafka.common.InvalidOffsetException: Attempt to append an offset (232884366) to position 203880 no larger than the last offset appended (232884366) to /var/kafka/kafka-logs/wctpi.avro.pri.processed-59/00000000000124356738.index.
at kafka.log.OffsetIndex$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:132)
at kafka.log.OffsetIndex$anonfun$append$1.apply(OffsetIndex.scala:122)
at kafka.log.OffsetIndex$anonfun$append$1.apply(OffsetIndex.scala:122)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:233)
at kafka.log.OffsetIndex.append(OffsetIndex.scala:122)
at kafka.log.LogSegment.recover(LogSegment.scala:225)
at kafka.log.Log$anonfun$loadSegments$4.apply(Log.scala:218)
at kafka.log.Log$anonfun$loadSegments$4.apply(Log.scala:179)
at scala.collection.TraversableLike$WithFilter$anonfun$foreach$1.apply(TraversableLike.scala:772)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at kafka.log.Log.loadSegments(Log.scala:179)
at kafka.log.Log.<init>(Log.scala:108)
at kafka.log.LogManager$anonfun$loadLogs$2$anonfun$3$anonfun$apply$10$anonfun$apply$1.apply$mcV$sp(LogManager.scala:151)
at kafka.utils.CoreUtils$anon$1.run(CoreUtils.scala:58)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Michael-Bronson
10 REPLIES 10

Re: kafka + kafka shutdown because InvalidOffsetException

Super Collaborator

I am sure the message is the reason why you can't start the broker, as it gets shutted down again during startup.

The docs state that this exception is "Thrown when the offset for a set of partitions is invalid (either undefined or out of range), and no reset policy has been configured."

One solution is to shutdown the broker and delete the log, but it will create a data loss, so maybe not what you need.

Some hints I found on resetting offsets (but always resetting consumer offsets, not producer, so maybe not helpful in your case):

https://community.hortonworks.com/articles/81357/manually-resetting-offset-for-a-kafka-topic.html

https://gist.github.com/marwei/cd40657c481f94ebe273ecc16601674b

Re: kafka + kafka shutdown because InvalidOffsetException

Super Collaborator

Mentioned also here: https://issues.apache.org/jira/browse/KAFKA-4502
Perhaps this could be helpful for you, it is mentioned that a leftover *index.swap file is causing the issue.

Re: kafka + kafka shutdown because InvalidOffsetException

@Harald under /var/kafka/kafka-logs we have many logs , how to know what are the logs that we need to delete ?

Michael-Bronson

Re: kafka + kafka shutdown because InvalidOffsetException

second in which aprotch we can delete the logs ?

Michael-Bronson

Re: kafka + kafka shutdown because InvalidOffsetException

Super Collaborator

just be sure, the logs do contain the messages from the kafka topics. If you delete them, they are simply gone, which can break the principal of guaranteed delivery.

The logs I would delete (or maybe you save them somewhere else), would be the one from the topic causing the issue, which seems to be /var/kafka/kafka-logs/wctpi.avro.pri.processed-59, and within this topic the log file /var/kafka/kafka-logs/wctpi.avro.pri.processed-59/00000000000124356738.index

So i would first try to delete/move the files

/var/kafka/kafka-logs/wctpi.avro.pri.processed-59/00000000000124356738*

try to restart the broker. If it works fine no more to do, but if you have still issue, move all files below /var/kafka/kafka-logs/wctpi.avro.pri.processed-59/ and start again. After this kafka should be starting again.

Re: kafka + kafka shutdown because InvalidOffsetException

regarding - https://issues.apache.org/jira/browse/KAFKA-4502 , not clearly what is the solution for this bug ( note - we have kafka on linux machine )

Michael-Bronson

Re: kafka + kafka shutdown because InvalidOffsetException

Super Collaborator

first check if you have a *.index.swap file at all in the logs dir. If so move it and start the broker

Re: kafka + kafka shutdown because InvalidOffsetException

I will check that but can we also to delete all files that are ended with ".index" ?

Michael-Bronson

Re: kafka + kafka shutdown because InvalidOffsetException

Super Collaborator

there are always pairs of *.log and *.index files. I would delete/move both together