Created 11-07-2023 09:37 PM
java.lang.NoClassDefFoundError: org/apache/flink/api/common/serialization/DeserializationSchema
scalaVersion := "2.12.18"
name := "kafka-flink"
organization := "com.me"
version := "1.0"
//libraryDependencies += "org.apache.kafka" % "kafka-clients" % "2.5.0.7.1.7.1000-141"
libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.14.0-csa1.6.2.0" % "provided"
libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" % "1.14.0-csa1.6.2.0" % "provided"
libraryDependencies += "org.apache.flink" %% "flink-connector-kafka" % "1.14.0-csa1.6.2.0" % "provided"
libraryDependencies += "org.apache.flink" % "flink-core" % "1.13.2-csa1.5.3.2" % "provided"
libraryDependencies += "org.apache.flink" %% "flink-clients" % "1.14.0-csa1.6.2.0" % "provided"
resolvers += "Maven Central" at "https://repo1.maven.org/maven2"
resolvers += "Cloudera" at "https://repository.cloudera.com/artifactory/cloudera-repos/"
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.kafka.clients.consumer.ConsumerConfig
import java.util.Properties
object MyFlinkJob {
def main(args: Array[String]): Unit = {
// Define Kafka topics
val cdr_topic = "my_topic"
// Prepare the Flink environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Prepare Kafka consumer properties
System.setProperty("java.security.auth.login.config", " ")
val properties: Properties = new Properties()
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, " ")
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, " ")
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000")
// Security settings
properties.setProperty("security.protocol", " ")
properties.setProperty("sasl.mechanism", " ")
properties.setProperty("sasl.kerberos.service.name", "kafka")
properties.setProperty("ssl.truststore.location", " ")
properties.setProperty("ssl.truststore.password", " ")
// Create Kafka consumer
val cdrConsumer = new FlinkKafkaConsumer[String](cdr_topic, new SimpleStringSchema(), properties)
// Create a data stream for incoming messages
val cdrStream: DataStream[String] = env.addSource(cdrConsumer)
// Process and print the messages
cdrStream.print()
// Execute the Flink job
env.execute("kafka_flink_Data_Stream_Processor")}}
Created 12-01-2023 06:05 AM
@I_Mesh01 This error is stating that your topic schema and flink schema do not match. I cant see much above related to the shape of the data or the schema, but this is what you should be looking at. Conflict in data/column types, missing data, etc, will result in the deserialization errors.