Support Questions
Find answers, ask questions, and share your expertise

spark structured Streaming with kafka log compaction topic

spark structured Streaming with kafka log compaction topic

New Contributor

I am running my structured streaming code in batches with one hour interval and after few batches(complete successfully) offset changed to old offset value and start reading old message again. In log I am getting below warning.

WARN KafkaSource: Partition topicName-29's offset was changed from 1092271004 to 35623, some data may have been missed. Some data may have been lost because they are not available in Kafka any more; either the data was aged out by Kafka or the topic may have been deleted before all the data in the topic was processed. If you want your streaming query to fail on such cases, set the source option "failOnDataLoss" to "true".

Build.sbt:

scalaVersion := "2.11.8"
val sparkVersion = "2.3.1"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion % "provided",
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
"org.apache.kafka" % "kafka-clients" % "0.11.0.1",
"org.apache.spark" %% "spark-sql-kafka-0-10" % sparkVersion % "provided"
)

Code:

val readMessages=sparkSession
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers",BOOTSTRAP_SERVERS)
      .option("subscribe", TOPIC_NAME)
      .option("failOnDataLoss", "false")
      .option("kafka.security.protocol","ssl")
      .option("kafka.ssl.keystore.location",propertyMap(CONSUMER_KEYSTORE_LOCATION))
      .option("kafka.ssl.keystore.password",propertyMap(CONSUMER_KEYSTORE_PASSWORD))
      .option("kafka.ssl.truststore.location", propertyMap(CONSUMER_TRUSTSTORE_LOCATION))
      .option("kafka.ssl.truststore.password", propertyMap(CONSUMER_TRUSTSTORE_PASSWORD))
      .option("startingOffsets", "earliest")
      .option("groupIdPrefix", "kafka-ingestion")
      .load()


val streamingQuery=readMessages
    .writeStream
    .format("text"))
    .trigger(Trigger.Once)
    .option("path",propertyMap(TARGET_PATH))
    .option("checkpointLocation",propertyMap(CHECKPOINT_PATH))
    .outputMode("append")
    .start()

streamingQuery.awaitTermination()

I am guessing whenever any incremental data available in kafka topic it is working fine But if there is no incremental data then offset change to older one and start fetching same data which already processed.

Topic is compact topic with retention period of 7 days.

I have tried with both startingOffsets as earliest and latest. Also tried to change failOnDataLoss but It is not working as expected.