Support Questions

Find answers, ask questions, and share your expertise

Kafka-Flink connection

avatar
New Contributor
Hi team,
We faced an issue while working on Json transformation using Flink after consuming Kafka upcoming data stream. The script developed using Scala.
 
-> The services details are:
1. Server Kafka-console-consumer version: 2.5.0.7.1.7.1000-141
2. Server Flink version: 14.14.0-csa1.6.2.0
3. Server Scala version: 12.13.10
 
-> The error is:

 

java.lang.NoClassDefFoundError: org/apache/flink/api/common/serialization/DeserializationSchema

 

 

-> The Scala scripts (running using SBT):
1. SBT file (build.sbt) :

 

scalaVersion := "2.12.18"
name := "kafka-flink"
organization := "com.me"
version := "1.0"
//libraryDependencies += "org.apache.kafka" % "kafka-clients" % "2.5.0.7.1.7.1000-141"
libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.14.0-csa1.6.2.0" % "provided"
libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" % "1.14.0-csa1.6.2.0" % "provided"
libraryDependencies += "org.apache.flink" %% "flink-connector-kafka" % "1.14.0-csa1.6.2.0" % "provided"
libraryDependencies += "org.apache.flink" % "flink-core" %  "1.13.2-csa1.5.3.2" % "provided"
libraryDependencies += "org.apache.flink" %% "flink-clients" % "1.14.0-csa1.6.2.0" % "provided"
resolvers += "Maven Central" at "https://repo1.maven.org/maven2"
resolvers += "Cloudera" at "https://repository.cloudera.com/artifactory/cloudera-repos/"

 

 

2. Main Object (Data stream transformation) :

 

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.kafka.clients.consumer.ConsumerConfig
import java.util.Properties

object MyFlinkJob {
  def main(args: Array[String]): Unit = {
// Define Kafka topics
val cdr_topic = "my_topic"

// Prepare the Flink environment
val env = StreamExecutionEnvironment.getExecutionEnvironment

// Prepare Kafka consumer properties
System.setProperty("java.security.auth.login.config", " ")
val properties: Properties = new Properties()
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, " ")
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, " ")
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000")

// Security settings
properties.setProperty("security.protocol", " ")
properties.setProperty("sasl.mechanism", " ")
properties.setProperty("sasl.kerberos.service.name", "kafka")
properties.setProperty("ssl.truststore.location", " ")
properties.setProperty("ssl.truststore.password", " ")

// Create Kafka consumer
val cdrConsumer = new FlinkKafkaConsumer[String](cdr_topic, new SimpleStringSchema(), properties)

// Create a data stream for incoming messages
val cdrStream: DataStream[String] = env.addSource(cdrConsumer)

// Process and print the messages
cdrStream.print()
// Execute the Flink job
env.execute("kafka_flink_Data_Stream_Processor")}}

 

1 REPLY 1

avatar

@I_Mesh01 This error is stating that your topic schema and flink schema do not match.   I cant see much above related to the shape of the data or the schema, but this is what you should be looking at.   Conflict in data/column types, missing data, etc, will result in the deserialization errors.