<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>question Re: Incompatible Kafka Version in Support Questions</title>
    <link>https://community.cloudera.com/t5/Support-Questions/Incompatible-Kafka-Version/m-p/383666#M245007</link>
    <description>&lt;P&gt;&lt;a href="https://community.cloudera.com/t5/user/viewprofilepage/user-id/108964"&gt;@yanseoi&lt;/a&gt;,&amp;nbsp;Welcome to our community! To help you get the best possible answer, I have tagged in our Kakfka experts&amp;nbsp;&lt;a href="https://community.cloudera.com/t5/user/viewprofilepage/user-id/81193"&gt;@Babasaheb&lt;/a&gt;&amp;nbsp;&lt;a href="https://community.cloudera.com/t5/user/viewprofilepage/user-id/34237"&gt;@Nidhin&lt;/a&gt;&amp;nbsp;&amp;nbsp;who may be able to assist you further.&lt;BR /&gt;&lt;BR /&gt;Please feel free to provide any additional information or details about your query, and we hope that you will find a satisfactory solution to your question.&lt;/P&gt;</description>
    <pubDate>Wed, 21 Feb 2024 05:19:34 GMT</pubDate>
    <dc:creator>VidyaSargur</dc:creator>
    <dc:date>2024-02-21T05:19:34Z</dc:date>
    <item>
      <title>Incompatible Kafka Version</title>
      <link>https://community.cloudera.com/t5/Support-Questions/Incompatible-Kafka-Version/m-p/383650#M245006</link>
      <description>&lt;P&gt;Hi everyone,&amp;nbsp;&lt;/P&gt;&lt;P&gt;We faced some issue when using Flink version 1.16.2.CSA.1.11.0.x, this is the error message&lt;/P&gt;&lt;LI-CODE lang="markup"&gt;2024-02-19 16:52:42
java.lang.IllegalStateException: Failed to commit KafkaCommittable{producerId=1077051, epoch=0, transactionalId=payload_migs-TEST_mig_2-0-3}
  at org.apache.flink.streaming.runtime.operators.sink.committables.CommitRequestImpl.signalFailedWithUnknownReason(CommitRequestImpl.java:77)
  at org.apache.flink.connector.kafka.sink.KafkaCommitter.commit(KafkaCommitter.java:119)
  at org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.commit(CheckpointCommittableManagerImpl.java:126)
  at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.commitAndEmit(CommitterOperator.java:176)
  at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.commitAndEmitCheckpoints(CommitterOperator.java:160)
  at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.initializeState(CommitterOperator.java:121)
  at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
  at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:283)
  at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:731)
  at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:706)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:672)
  at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
  at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
  at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.RuntimeException: Incompatible KafkaProducer version
  at org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.getField(FlinkKafkaInternalProducer.java:266)
  at org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.getField(FlinkKafkaInternalProducer.java:253)
  at org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.resumeTransaction(FlinkKafkaInternalProducer.java:292)
  at org.apache.flink.connector.kafka.sink.KafkaCommitter.getRecoveryProducer(KafkaCommitter.java:143)
  at org.apache.flink.connector.kafka.sink.KafkaCommitter.lambda$commit$0(KafkaCommitter.java:72)
  at java.base/java.util.Optional.orElseGet(Optional.java:369)
  at org.apache.flink.connector.kafka.sink.KafkaCommitter.commit(KafkaCommitter.java:72)
  ... 16 more
Caused by: java.lang.NoSuchFieldException: topicPartitionBookkeeper
  at java.base/java.lang.Class.getDeclaredField(Class.java:2411)
  at org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.getField(FlinkKafkaInternalProducer.java:262)
  ... 22 more&lt;/LI-CODE&gt;&lt;P&gt;Our environment refference&amp;nbsp; CDP 7.1.9 with &amp;nbsp;CSA 1.11&lt;/P&gt;</description>
      <pubDate>Wed, 21 Feb 2024 04:04:01 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Support-Questions/Incompatible-Kafka-Version/m-p/383650#M245006</guid>
      <dc:creator>yanseoi</dc:creator>
      <dc:date>2024-02-21T04:04:01Z</dc:date>
    </item>
    <item>
      <title>Re: Incompatible Kafka Version</title>
      <link>https://community.cloudera.com/t5/Support-Questions/Incompatible-Kafka-Version/m-p/383666#M245007</link>
      <description>&lt;P&gt;&lt;a href="https://community.cloudera.com/t5/user/viewprofilepage/user-id/108964"&gt;@yanseoi&lt;/a&gt;,&amp;nbsp;Welcome to our community! To help you get the best possible answer, I have tagged in our Kakfka experts&amp;nbsp;&lt;a href="https://community.cloudera.com/t5/user/viewprofilepage/user-id/81193"&gt;@Babasaheb&lt;/a&gt;&amp;nbsp;&lt;a href="https://community.cloudera.com/t5/user/viewprofilepage/user-id/34237"&gt;@Nidhin&lt;/a&gt;&amp;nbsp;&amp;nbsp;who may be able to assist you further.&lt;BR /&gt;&lt;BR /&gt;Please feel free to provide any additional information or details about your query, and we hope that you will find a satisfactory solution to your question.&lt;/P&gt;</description>
      <pubDate>Wed, 21 Feb 2024 05:19:34 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Support-Questions/Incompatible-Kafka-Version/m-p/383666#M245007</guid>
      <dc:creator>VidyaSargur</dc:creator>
      <dc:date>2024-02-21T05:19:34Z</dc:date>
    </item>
    <item>
      <title>Re: Incompatible Kafka Version</title>
      <link>https://community.cloudera.com/t5/Support-Questions/Incompatible-Kafka-Version/m-p/383668#M245008</link>
      <description>&lt;P&gt;Thankyou Vidya&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;Currently, we have a Flink job in the Cloudera CDP 7.1.9 environment with Flink Version: 1.16.2-csa1.11.0.2. We are attempting to create a Flink job to read data from Kafka and send it back to Kafka. &lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&lt;BR /&gt;The Kafka configuration we are using is as follows:&lt;BR /&gt;kafka.group.id=TEST_mig_2&lt;BR /&gt;kafka.flink.partition-discovery.interval-millis=120000&lt;BR /&gt;kafka.retries=5&lt;BR /&gt;kafka.linger.ms=500&lt;BR /&gt;kafka.retry.backoff.ms=100&lt;BR /&gt;kafka.compression.type=lz4&lt;BR /&gt;kafka.request.timeout.ms=180000&lt;BR /&gt;kafka.delivery.timeout.ms=302000&lt;BR /&gt;kafka.transaction.timeout.ms=900000&lt;BR /&gt;kafka.auto.offset.reset=earliest&lt;BR /&gt;kafka.isolation.level=read_committed&lt;BR /&gt;&lt;BR /&gt;and the Kafka writer snapshot we are using is:&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="markup"&gt;public class WriterModule implements Write&amp;lt;String, String&amp;gt; {

    @Override
    public DataStream&amp;lt;String&amp;gt; write(
            DataStream&amp;lt;String&amp;gt; transactionStream, ParameterTool params, StreamExecutionEnvironment env) {
        String outputTopic = params.get("app.topic.output");
        Properties kafkaProps = Utils.readKafkaPropertiesSink(params);
        String appName = params.get("app.name", "Generic");
        int sinkParallelism=params.getInt("app.input.sink.parallelism");
        KafkaSink&amp;lt;String&amp;gt; streamOutput = KafkaSink.&amp;lt;String&amp;gt;builder()
                .setBootstrapServers(params.get(Utils.KAFKA_BOOTSTRAP_SERVERS_SINK))
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic(outputTopic)
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .build())
                .setKafkaProducerConfig(kafkaProps)
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .setTransactionalIdPrefix(appName+"-"+outputTopic)
                .build();
        transactionStream.sinkTo(streamOutput).setParallelism(sinkParallelism).name("trxid"+appName+outputTopic+"dup").uid(appName+"-"+outputTopic);

        return transactionStream;
    }
    }&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Wed, 21 Feb 2024 05:33:42 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Support-Questions/Incompatible-Kafka-Version/m-p/383668#M245008</guid>
      <dc:creator>yanseoi</dc:creator>
      <dc:date>2024-02-21T05:33:42Z</dc:date>
    </item>
    <item>
      <title>Re: Incompatible Kafka Version</title>
      <link>https://community.cloudera.com/t5/Support-Questions/Incompatible-Kafka-Version/m-p/383675#M245009</link>
      <description>&lt;P&gt;Hi&amp;nbsp;&lt;a href="https://community.cloudera.com/t5/user/viewprofilepage/user-id/108964"&gt;@yanseoi&lt;/a&gt;&amp;nbsp;, what you are encountering is the same issue discussed in flink community:&lt;/P&gt;&lt;P&gt;&lt;A href="https://lists.apache.org/thread/07d46txb6vttw7c8oyr6z4n676vgqh28" target="_blank"&gt;https://lists.apache.org/thread/07d46txb6vttw7c8oyr6z4n676vgqh28&lt;/A&gt;&lt;/P&gt;&lt;P&gt;it is due to :&lt;/P&gt;&lt;P&gt;&lt;A href="https://issues.apache.org/jira/browse/FLINK-29978" target="_blank"&gt;https://issues.apache.org/jira/browse/FLINK-29978&lt;/A&gt;&lt;/P&gt;&lt;P&gt;&lt;A href="https://issues.apache.org/jira/browse/FLINK-29977" target="_blank"&gt;https://issues.apache.org/jira/browse/FLINK-29977&lt;/A&gt;&lt;/P&gt;&lt;P&gt;And fixed with a kafka client version upgrade in:&lt;/P&gt;&lt;P&gt;&lt;A href="https://issues.apache.org/jira/browse/FLINK-31599" target="_blank"&gt;https://issues.apache.org/jira/browse/FLINK-31599&lt;/A&gt;&lt;/P&gt;&lt;P&gt;the same change should have been included in CSA 1.12.0, please try:&lt;/P&gt;&lt;P&gt;&lt;A href="https://docs.cloudera.com/csa/1.12.0/installation/topics/csa-upgrade-artifacts.html" target="_blank"&gt;https://docs.cloudera.com/csa/1.12.0/installation/topics/csa-upgrade-artifacts.html&lt;/A&gt;&lt;/P&gt;</description>
      <pubDate>Wed, 21 Feb 2024 06:42:14 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Support-Questions/Incompatible-Kafka-Version/m-p/383675#M245009</guid>
      <dc:creator>Yuexin Zhang</dc:creator>
      <dc:date>2024-02-21T06:42:14Z</dc:date>
    </item>
    <item>
      <title>Re: Incompatible Kafka Version</title>
      <link>https://community.cloudera.com/t5/Support-Questions/Incompatible-Kafka-Version/m-p/384484#M245385</link>
      <description>&lt;P&gt;&lt;a href="https://community.cloudera.com/t5/user/viewprofilepage/user-id/108964"&gt;@yanseoi&lt;/a&gt;,&amp;nbsp;Did the response assist in resolving your query? If it did, kindly mark the relevant reply as the solution, as it will aid others in locating the answer more easily in the future.&lt;/P&gt;</description>
      <pubDate>Mon, 04 Mar 2024 07:32:06 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Support-Questions/Incompatible-Kafka-Version/m-p/384484#M245385</guid>
      <dc:creator>VidyaSargur</dc:creator>
      <dc:date>2024-03-04T07:32:06Z</dc:date>
    </item>
  </channel>
</rss>

