Support Questions
Find answers, ask questions, and share your expertise

Unable to manually commit offset in kafka direct stream, Spark streaming

Unable to manually commit offset in kafka direct stream, Spark streaming

Explorer

Spark Version: 2.4

Spark Streaming kafka: spark-streaming-kafka-0-10_2.11

 

Observe the throw new Exception(); in while loop. Even if the batch fails because of the exception, i see the offset committed, I am expecting some lag here as the processing failed, what is wrong here ?

 

enable.auto.commit : false

 

 

JavaInputDStream<ConsumerRecord<String, String>> kafkaStream = KafkaUtils.createDirectStream(jssc,
                    LocationStrategies.PreferConsistent(),
                    ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));

    kafkaStream.foreachRDD(kafkaStreamRDD -> {
                // fetch kafka offsets for manually committing it later
                OffsetRange[] offsetRanges = ((HasOffsetRanges) kafkaStreamRDD.rdd()).offsetRanges();


                // filter unwanted data
                kafkaStreamRDD.filter(new Function<ConsumerRecord<String, String>, Boolean>() {

                    //filter logic here

                }).foreachPartition(kafkaRecords -> {

                    //Initializing DB connections

                    while (kafkaRecords.hasNext()) {

                        //doing some work here

                        //-----> EXCEPTION
                        throw new Exception();
                    }

                });
                // commit offsets saveOffsets after processing
            ((CanCommitOffsets) kafkaStream.inputDStream()).commitAsync(offsetRanges, (offsets, exception) -> {
                if (exception != null) {
                    System.out.println("-------------Unable to commit offsets, something went wrong, trace ------------"+ exception.getCause());
                    exception.printStackTrace(); // need this for driver
                } else {
                    System.out.println("Successfully committed offsets"); // need this for driver
                    for (OffsetRange offsetRange : offsetRanges) {
                        System.out.println("Offset Info: paratition {}, fromOffset {} untilOffset {}: "+ offsetRange.partition() +":"+ offsetRange.fromOffset() +":"+ offsetRange.untilOffset());
                    }
                }
            });

 

Don't have an account?