Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Incompatible Kafka Version

avatar
New Contributor

Hi everyone, 

We faced some issue when using Flink version 1.16.2.CSA.1.11.0.x, this is the error message

2024-02-19 16:52:42
java.lang.IllegalStateException: Failed to commit KafkaCommittable{producerId=1077051, epoch=0, transactionalId=payload_migs-TEST_mig_2-0-3}
  at org.apache.flink.streaming.runtime.operators.sink.committables.CommitRequestImpl.signalFailedWithUnknownReason(CommitRequestImpl.java:77)
  at org.apache.flink.connector.kafka.sink.KafkaCommitter.commit(KafkaCommitter.java:119)
  at org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.commit(CheckpointCommittableManagerImpl.java:126)
  at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.commitAndEmit(CommitterOperator.java:176)
  at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.commitAndEmitCheckpoints(CommitterOperator.java:160)
  at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.initializeState(CommitterOperator.java:121)
  at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
  at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:283)
  at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:731)
  at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:706)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:672)
  at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
  at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
  at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.RuntimeException: Incompatible KafkaProducer version
  at org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.getField(FlinkKafkaInternalProducer.java:266)
  at org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.getField(FlinkKafkaInternalProducer.java:253)
  at org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.resumeTransaction(FlinkKafkaInternalProducer.java:292)
  at org.apache.flink.connector.kafka.sink.KafkaCommitter.getRecoveryProducer(KafkaCommitter.java:143)
  at org.apache.flink.connector.kafka.sink.KafkaCommitter.lambda$commit$0(KafkaCommitter.java:72)
  at java.base/java.util.Optional.orElseGet(Optional.java:369)
  at org.apache.flink.connector.kafka.sink.KafkaCommitter.commit(KafkaCommitter.java:72)
  ... 16 more
Caused by: java.lang.NoSuchFieldException: topicPartitionBookkeeper
  at java.base/java.lang.Class.getDeclaredField(Class.java:2411)
  at org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.getField(FlinkKafkaInternalProducer.java:262)
  ... 22 more

Our environment refference  CDP 7.1.9 with  CSA 1.11

1 ACCEPTED SOLUTION

avatar
Expert Contributor
4 REPLIES 4

avatar
Community Manager

@yanseoi, Welcome to our community! To help you get the best possible answer, I have tagged in our Kakfka experts @Babasaheb @Nidhin  who may be able to assist you further.

Please feel free to provide any additional information or details about your query, and we hope that you will find a satisfactory solution to your question.



Regards,

Vidya Sargur,
Community Manager


Was your question answered? Make sure to mark the answer as the accepted solution.
If you find a reply useful, say thanks by clicking on the thumbs up button.
Learn more about the Cloudera Community:

avatar
New Contributor

Thankyou Vidya 

Currently, we have a Flink job in the Cloudera CDP 7.1.9 environment with Flink Version: 1.16.2-csa1.11.0.2. We are attempting to create a Flink job to read data from Kafka and send it back to Kafka.


The Kafka configuration we are using is as follows:
kafka.group.id=TEST_mig_2
kafka.flink.partition-discovery.interval-millis=120000
kafka.retries=5
kafka.linger.ms=500
kafka.retry.backoff.ms=100
kafka.compression.type=lz4
kafka.request.timeout.ms=180000
kafka.delivery.timeout.ms=302000
kafka.transaction.timeout.ms=900000
kafka.auto.offset.reset=earliest
kafka.isolation.level=read_committed

and the Kafka writer snapshot we are using is:

 

public class WriterModule implements Write<String, String> {

    @Override
    public DataStream<String> write(
            DataStream<String> transactionStream, ParameterTool params, StreamExecutionEnvironment env) {
        String outputTopic = params.get("app.topic.output");
        Properties kafkaProps = Utils.readKafkaPropertiesSink(params);
        String appName = params.get("app.name", "Generic");
        int sinkParallelism=params.getInt("app.input.sink.parallelism");
        KafkaSink<String> streamOutput = KafkaSink.<String>builder()
                .setBootstrapServers(params.get(Utils.KAFKA_BOOTSTRAP_SERVERS_SINK))
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic(outputTopic)
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .build())
                .setKafkaProducerConfig(kafkaProps)
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .setTransactionalIdPrefix(appName+"-"+outputTopic)
                .build();
        transactionStream.sinkTo(streamOutput).setParallelism(sinkParallelism).name("trxid"+appName+outputTopic+"dup").uid(appName+"-"+outputTopic);

        return transactionStream;
    }
    }

 

 

avatar
Expert Contributor

Hi @yanseoi , what you are encountering is the same issue discussed in flink community:

https://lists.apache.org/thread/07d46txb6vttw7c8oyr6z4n676vgqh28

it is due to :

https://issues.apache.org/jira/browse/FLINK-29978

https://issues.apache.org/jira/browse/FLINK-29977

And fixed with a kafka client version upgrade in:

https://issues.apache.org/jira/browse/FLINK-31599

the same change should have been included in CSA 1.12.0, please try:

https://docs.cloudera.com/csa/1.12.0/installation/topics/csa-upgrade-artifacts.html

avatar
Community Manager

@yanseoi, Did the response assist in resolving your query? If it did, kindly mark the relevant reply as the solution, as it will aid others in locating the answer more easily in the future.



Regards,

Vidya Sargur,
Community Manager


Was your question answered? Make sure to mark the answer as the accepted solution.
If you find a reply useful, say thanks by clicking on the thumbs up button.
Learn more about the Cloudera Community: