I am running HDP with YARN, Kafka 2.10-0.10.1.2.6.3, Spark2, and Zookeeper. I'm submitting these jobs to YARN and the Spark2 jobs store the offsets in Zookeeper and manually fetches and saves them. The jobs had been off for more than a week then I adjusted the retention of my Kafka data to 4 days, to reduce data size, and now I'm seeing this error:
org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {global_transactions-2=133030}
I have 6 partitions and each one fetches an offset from Zookeeper that used to be valid. Since I changed the retention policy to 4 days it deleted all of the old data, including where this offset starts, so now the jobs wont start.
In my Spark2 job I set auto.offset.reset=earliest. Here is the string map I'm passing as the Kafka parameters as well as the code I'm using to kick off my direct stream.
object Kafka {
def getParams(topicSet: Set[String], groupId: String, conf: AppConfig): Map[String, String] = {<br> return Map[String, String](<br> {"bootstrap.servers" -> conf.kafkaHosts},<br> {"group.id" -> groupId},<br> {"auto.offset.reset" -> "earliest"},<br> {"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"},<br> {"value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"},<br> {"enable.auto.commit" -> "false"}<br> )<br> }<br>}
val kafkaData = KafkaUtils.createDirectStream[String, String](<br> streamingContext,<br> LocationStrategies.PreferConsistent,<br> ConsumerStrategies.Subscribe[String, String](topicSet, Kafka.getParams(topicSet = topicSet, groupId = appName, conf = config), fromOffsets)<br>)
Can anyone provide any information as to why Kafka does not see the auto.offset.reset setting?
If I change it to something invalid, like test I get an error back telling me that it's not a valid configuration so I know it's seeing it in that context.