Hi team,
We faced an issue while working on Json transformation using Flink after consuming Kafka upcoming data stream. The script developed using Scala.
-> The services details are:
1. Server Kafka-console-consumer version: 2.5.0.7.1.7.1000-141
2. Server Flink version: 14.14.0-csa1.6.2.0
3. Server Scala version: 12.13.10
-> The error is:
java.lang.NoClassDefFoundError: org/apache/flink/api/common/serialization/DeserializationSchema
-> The Scala scripts (running using SBT):
1. SBT file (build.sbt) :
scalaVersion := "2.12.18"
name := "kafka-flink"
organization := "com.me"
version := "1.0"
//libraryDependencies += "org.apache.kafka" % "kafka-clients" % "2.5.0.7.1.7.1000-141"
libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.14.0-csa1.6.2.0" % "provided"
libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" % "1.14.0-csa1.6.2.0" % "provided"
libraryDependencies += "org.apache.flink" %% "flink-connector-kafka" % "1.14.0-csa1.6.2.0" % "provided"
libraryDependencies += "org.apache.flink" % "flink-core" % "1.13.2-csa1.5.3.2" % "provided"
libraryDependencies += "org.apache.flink" %% "flink-clients" % "1.14.0-csa1.6.2.0" % "provided"
resolvers += "Maven Central" at "https://repo1.maven.org/maven2"
resolvers += "Cloudera" at "https://repository.cloudera.com/artifactory/cloudera-repos/"
2. Main Object (Data stream transformation) :
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.kafka.clients.consumer.ConsumerConfig
import java.util.Properties
object MyFlinkJob {
def main(args: Array[String]): Unit = {
// Define Kafka topics
val cdr_topic = "my_topic"
// Prepare the Flink environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Prepare Kafka consumer properties
System.setProperty("java.security.auth.login.config", " ")
val properties: Properties = new Properties()
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, " ")
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, " ")
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000")
// Security settings
properties.setProperty("security.protocol", " ")
properties.setProperty("sasl.mechanism", " ")
properties.setProperty("sasl.kerberos.service.name", "kafka")
properties.setProperty("ssl.truststore.location", " ")
properties.setProperty("ssl.truststore.password", " ")
// Create Kafka consumer
val cdrConsumer = new FlinkKafkaConsumer[String](cdr_topic, new SimpleStringSchema(), properties)
// Create a data stream for incoming messages
val cdrStream: DataStream[String] = env.addSource(cdrConsumer)
// Process and print the messages
cdrStream.print()
// Execute the Flink job
env.execute("kafka_flink_Data_Stream_Processor")}}