Member since 
    
	
		
		
		05-09-2023
	
	
	
	
	
	
	
	
	
	
	
	
	
	
			
      
                2
            
            
                Posts
            
        
                0
            
            
                Kudos Received
            
        
                0
            
            
                Solutions
            
        
			
    
	
		
		
		05-10-2023
	
		
		07:39 AM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 By the way - when we change the configuration to   "spark.streaming.stopGracefullyOnShutdown": "false",  And we kill the job,  We see that the commits are in Sync in both Kafka consume group and our ingested offset information. 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		05-10-2023
	
		
		07:31 AM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 We are migrating to CDP and we have some streaming jobs using Dstreams API.  Source is Kafka Topic with 3 partitions and Sink is Hbase. and offset commit is in Kafka - using the Kafka Integration  As per below documentation   https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html  We do this after the actual data ingestion completion to the Hbase table  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)  Kafka configs -     "security.protocol": "SASL_SSL",
"sasl.kerberos.service.name": "kafka",
"key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
"ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG": "StringDeserializer.class"
"auto.offset.reset": "earliest",
"group.id": "STREAMING_JOB_GROUPID_101",
"enable.auto.commit": "false"       streaming configurations     "spark": {
"sparkConf": {
"spark.app.name": "STREAMING_JOB_APPLICATION_NAME",
"spark.streaming.stopGracefullyOnShutdown": "true",
"spark.serializer": "org.apache.spark.serializer.KryoSerializer",
"spark.streaming.kafka.maxRatePerPartition": "12"
}
"streamingContext": {
"batchMilliSeconds": 10000
}       Also we store the micro batch offset information () of every micro-batch in a table in Hbase - as log information      rdd.asInstanceOf[HasOffsetRanges].offsetRanges       How ever  -- our observation is when we kill the application using "yarn application -kill <application id>  The committed offsets in the Kafka - consumer group is -- ahead of of the actual offsets that were processed in the last successful batch ingestion.  So when we restart the job - it is starting from the offsets that were committed (ahead of actual data ingested to Hbase) - This is causing data loss for us  Sample       Kafka Offsets stored
STREAMING_JOB_APPLICATION_NAME topic.MY_STREAMING_TOPIC    2          3035673         3089643         53970           -               -               -
STREAMING_JOB_APPLICATION_NAME topic.MY_STREAMING_TOPIC    1          3035678         3089648         53970           -               -               -
STREAMING_JOB_APPLICATION_NAME topic.MY_STREAMING_TOPIC    0          3035682         3089651         53969           -   
Actual offset - successful ingested batch
TOPIC : OffsetRange(topic: 'topic.MY_STREAMING_TOPIC', partition: 2, range: [3035553 -> 3035673])
TOPIC : OffsetRange(topic: 'topic.MY_STREAMING_TOPIC', partition: 0, range: [3035562 -> 3035682])
TOPIC : OffsetRange(topic: 'topic.MY_STREAMING_TOPIC', partition: 1, range: [3035558 -> 3035678])     Could you please elaborate on this behavior 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		 
        
