Member since
01-28-2024
2
Posts
0
Kudos Received
0
Solutions
02-20-2024
09:33 PM
Thankyou Vidya Currently, we have a Flink job in the Cloudera CDP 7.1.9 environment with Flink Version: 1.16.2-csa1.11.0.2. We are attempting to create a Flink job to read data from Kafka and send it back to Kafka. The Kafka configuration we are using is as follows: kafka.group.id=TEST_mig_2 kafka.flink.partition-discovery.interval-millis=120000 kafka.retries=5 kafka.linger.ms=500 kafka.retry.backoff.ms=100 kafka.compression.type=lz4 kafka.request.timeout.ms=180000 kafka.delivery.timeout.ms=302000 kafka.transaction.timeout.ms=900000 kafka.auto.offset.reset=earliest kafka.isolation.level=read_committed and the Kafka writer snapshot we are using is: public class WriterModule implements Write<String, String> {
@Override
public DataStream<String> write(
DataStream<String> transactionStream, ParameterTool params, StreamExecutionEnvironment env) {
String outputTopic = params.get("app.topic.output");
Properties kafkaProps = Utils.readKafkaPropertiesSink(params);
String appName = params.get("app.name", "Generic");
int sinkParallelism=params.getInt("app.input.sink.parallelism");
KafkaSink<String> streamOutput = KafkaSink.<String>builder()
.setBootstrapServers(params.get(Utils.KAFKA_BOOTSTRAP_SERVERS_SINK))
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(outputTopic)
.setValueSerializationSchema(new SimpleStringSchema())
.build())
.setKafkaProducerConfig(kafkaProps)
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix(appName+"-"+outputTopic)
.build();
transactionStream.sinkTo(streamOutput).setParallelism(sinkParallelism).name("trxid"+appName+outputTopic+"dup").uid(appName+"-"+outputTopic);
return transactionStream;
}
}
... View more
02-20-2024
08:04 PM
Hi everyone, We faced some issue when using Flink version 1.16.2.CSA.1.11.0.x, this is the error message 2024-02-19 16:52:42
java.lang.IllegalStateException: Failed to commit KafkaCommittable{producerId=1077051, epoch=0, transactionalId=payload_migs-TEST_mig_2-0-3}
at org.apache.flink.streaming.runtime.operators.sink.committables.CommitRequestImpl.signalFailedWithUnknownReason(CommitRequestImpl.java:77)
at org.apache.flink.connector.kafka.sink.KafkaCommitter.commit(KafkaCommitter.java:119)
at org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.commit(CheckpointCommittableManagerImpl.java:126)
at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.commitAndEmit(CommitterOperator.java:176)
at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.commitAndEmitCheckpoints(CommitterOperator.java:160)
at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.initializeState(CommitterOperator.java:121)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:283)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:731)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:706)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:672)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.RuntimeException: Incompatible KafkaProducer version
at org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.getField(FlinkKafkaInternalProducer.java:266)
at org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.getField(FlinkKafkaInternalProducer.java:253)
at org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.resumeTransaction(FlinkKafkaInternalProducer.java:292)
at org.apache.flink.connector.kafka.sink.KafkaCommitter.getRecoveryProducer(KafkaCommitter.java:143)
at org.apache.flink.connector.kafka.sink.KafkaCommitter.lambda$commit$0(KafkaCommitter.java:72)
at java.base/java.util.Optional.orElseGet(Optional.java:369)
at org.apache.flink.connector.kafka.sink.KafkaCommitter.commit(KafkaCommitter.java:72)
... 16 more
Caused by: java.lang.NoSuchFieldException: topicPartitionBookkeeper
at java.base/java.lang.Class.getDeclaredField(Class.java:2411)
at org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.getField(FlinkKafkaInternalProducer.java:262)
... 22 more Our environment refference CDP 7.1.9 with CSA 1.11
... View more
Labels:
- Labels:
-
Apache Flink
-
Apache Kafka