Member since
05-09-2023
2
Posts
0
Kudos Received
0
Solutions
05-10-2023
07:39 AM
By the way - when we change the configuration to "spark.streaming.stopGracefullyOnShutdown": "false", And we kill the job, We see that the commits are in Sync in both Kafka consume group and our ingested offset information.
... View more
05-10-2023
07:31 AM
We are migrating to CDP and we have some streaming jobs using Dstreams API. Source is Kafka Topic with 3 partitions and Sink is Hbase. and offset commit is in Kafka - using the Kafka Integration As per below documentation https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html We do this after the actual data ingestion completion to the Hbase table stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) Kafka configs - "security.protocol": "SASL_SSL",
"sasl.kerberos.service.name": "kafka",
"key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
"ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG": "StringDeserializer.class"
"auto.offset.reset": "earliest",
"group.id": "STREAMING_JOB_GROUPID_101",
"enable.auto.commit": "false" streaming configurations "spark": {
"sparkConf": {
"spark.app.name": "STREAMING_JOB_APPLICATION_NAME",
"spark.streaming.stopGracefullyOnShutdown": "true",
"spark.serializer": "org.apache.spark.serializer.KryoSerializer",
"spark.streaming.kafka.maxRatePerPartition": "12"
}
"streamingContext": {
"batchMilliSeconds": 10000
} Also we store the micro batch offset information () of every micro-batch in a table in Hbase - as log information rdd.asInstanceOf[HasOffsetRanges].offsetRanges How ever -- our observation is when we kill the application using "yarn application -kill <application id> The committed offsets in the Kafka - consumer group is -- ahead of of the actual offsets that were processed in the last successful batch ingestion. So when we restart the job - it is starting from the offsets that were committed (ahead of actual data ingested to Hbase) - This is causing data loss for us Sample Kafka Offsets stored
STREAMING_JOB_APPLICATION_NAME topic.MY_STREAMING_TOPIC 2 3035673 3089643 53970 - - -
STREAMING_JOB_APPLICATION_NAME topic.MY_STREAMING_TOPIC 1 3035678 3089648 53970 - - -
STREAMING_JOB_APPLICATION_NAME topic.MY_STREAMING_TOPIC 0 3035682 3089651 53969 -
Actual offset - successful ingested batch
TOPIC : OffsetRange(topic: 'topic.MY_STREAMING_TOPIC', partition: 2, range: [3035553 -> 3035673])
TOPIC : OffsetRange(topic: 'topic.MY_STREAMING_TOPIC', partition: 0, range: [3035562 -> 3035682])
TOPIC : OffsetRange(topic: 'topic.MY_STREAMING_TOPIC', partition: 1, range: [3035558 -> 3035678]) Could you please elaborate on this behavior
... View more