Created 02-20-2024 08:04 PM
Hi everyone,
We faced some issue when using Flink version 1.16.2.CSA.1.11.0.x, this is the error message
2024-02-19 16:52:42
java.lang.IllegalStateException: Failed to commit KafkaCommittable{producerId=1077051, epoch=0, transactionalId=payload_migs-TEST_mig_2-0-3}
at org.apache.flink.streaming.runtime.operators.sink.committables.CommitRequestImpl.signalFailedWithUnknownReason(CommitRequestImpl.java:77)
at org.apache.flink.connector.kafka.sink.KafkaCommitter.commit(KafkaCommitter.java:119)
at org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.commit(CheckpointCommittableManagerImpl.java:126)
at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.commitAndEmit(CommitterOperator.java:176)
at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.commitAndEmitCheckpoints(CommitterOperator.java:160)
at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.initializeState(CommitterOperator.java:121)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:283)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:731)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:706)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:672)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.RuntimeException: Incompatible KafkaProducer version
at org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.getField(FlinkKafkaInternalProducer.java:266)
at org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.getField(FlinkKafkaInternalProducer.java:253)
at org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.resumeTransaction(FlinkKafkaInternalProducer.java:292)
at org.apache.flink.connector.kafka.sink.KafkaCommitter.getRecoveryProducer(KafkaCommitter.java:143)
at org.apache.flink.connector.kafka.sink.KafkaCommitter.lambda$commit$0(KafkaCommitter.java:72)
at java.base/java.util.Optional.orElseGet(Optional.java:369)
at org.apache.flink.connector.kafka.sink.KafkaCommitter.commit(KafkaCommitter.java:72)
... 16 more
Caused by: java.lang.NoSuchFieldException: topicPartitionBookkeeper
at java.base/java.lang.Class.getDeclaredField(Class.java:2411)
at org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.getField(FlinkKafkaInternalProducer.java:262)
... 22 more
Our environment refference CDP 7.1.9 with CSA 1.11
Created 02-20-2024 10:42 PM
Hi @yanseoi , what you are encountering is the same issue discussed in flink community:
https://lists.apache.org/thread/07d46txb6vttw7c8oyr6z4n676vgqh28
it is due to :
https://issues.apache.org/jira/browse/FLINK-29978
https://issues.apache.org/jira/browse/FLINK-29977
And fixed with a kafka client version upgrade in:
https://issues.apache.org/jira/browse/FLINK-31599
the same change should have been included in CSA 1.12.0, please try:
https://docs.cloudera.com/csa/1.12.0/installation/topics/csa-upgrade-artifacts.html
Created 02-20-2024 09:19 PM
@yanseoi, Welcome to our community! To help you get the best possible answer, I have tagged in our Kakfka experts @Babasaheb @Nidhin who may be able to assist you further.
Please feel free to provide any additional information or details about your query, and we hope that you will find a satisfactory solution to your question.
Regards,
Vidya Sargur,Created 02-20-2024 09:33 PM
Thankyou Vidya
Currently, we have a Flink job in the Cloudera CDP 7.1.9 environment with Flink Version: 1.16.2-csa1.11.0.2. We are attempting to create a Flink job to read data from Kafka and send it back to Kafka.
The Kafka configuration we are using is as follows:
kafka.group.id=TEST_mig_2
kafka.flink.partition-discovery.interval-millis=120000
kafka.retries=5
kafka.linger.ms=500
kafka.retry.backoff.ms=100
kafka.compression.type=lz4
kafka.request.timeout.ms=180000
kafka.delivery.timeout.ms=302000
kafka.transaction.timeout.ms=900000
kafka.auto.offset.reset=earliest
kafka.isolation.level=read_committed
and the Kafka writer snapshot we are using is:
public class WriterModule implements Write<String, String> {
@Override
public DataStream<String> write(
DataStream<String> transactionStream, ParameterTool params, StreamExecutionEnvironment env) {
String outputTopic = params.get("app.topic.output");
Properties kafkaProps = Utils.readKafkaPropertiesSink(params);
String appName = params.get("app.name", "Generic");
int sinkParallelism=params.getInt("app.input.sink.parallelism");
KafkaSink<String> streamOutput = KafkaSink.<String>builder()
.setBootstrapServers(params.get(Utils.KAFKA_BOOTSTRAP_SERVERS_SINK))
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(outputTopic)
.setValueSerializationSchema(new SimpleStringSchema())
.build())
.setKafkaProducerConfig(kafkaProps)
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix(appName+"-"+outputTopic)
.build();
transactionStream.sinkTo(streamOutput).setParallelism(sinkParallelism).name("trxid"+appName+outputTopic+"dup").uid(appName+"-"+outputTopic);
return transactionStream;
}
}
Created 02-20-2024 10:42 PM
Hi @yanseoi , what you are encountering is the same issue discussed in flink community:
https://lists.apache.org/thread/07d46txb6vttw7c8oyr6z4n676vgqh28
it is due to :
https://issues.apache.org/jira/browse/FLINK-29978
https://issues.apache.org/jira/browse/FLINK-29977
And fixed with a kafka client version upgrade in:
https://issues.apache.org/jira/browse/FLINK-31599
the same change should have been included in CSA 1.12.0, please try:
https://docs.cloudera.com/csa/1.12.0/installation/topics/csa-upgrade-artifacts.html
Created 03-03-2024 11:32 PM
@yanseoi, Did the response assist in resolving your query? If it did, kindly mark the relevant reply as the solution, as it will aid others in locating the answer more easily in the future.
Regards,
Vidya Sargur,