<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>question Kafka-Flink connection in Support Questions</title>
    <link>https://community.cloudera.com/t5/Support-Questions/Kafka-Flink-connection/m-p/378739#M243648</link>
    <description>&lt;DIV&gt;&lt;SPAN&gt;Hi team,&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;We faced an issue &lt;/SPAN&gt;&lt;SPAN&gt;while&lt;/SPAN&gt;&lt;SPAN&gt; working on Json transformation using Flink after consuming Kafka upcoming data stream. The script developed using Scala.&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;-&amp;gt; The services details are:&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;1&lt;/SPAN&gt;&lt;SPAN&gt;. Server Kafka-console-consumer version: &lt;/SPAN&gt;&lt;STRONG&gt;2.5.0.7.1.7.1000-141&lt;/STRONG&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;2&lt;/SPAN&gt;&lt;SPAN&gt;. Server Flink version: &lt;/SPAN&gt;&lt;STRONG&gt;14.14.0-csa1.6.2.0&lt;/STRONG&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;3&lt;/SPAN&gt;&lt;SPAN&gt;. Server Scala version: &lt;/SPAN&gt;&lt;STRONG&gt;12.13.10&lt;/STRONG&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;-&amp;gt; The error is:&lt;/SPAN&gt;&lt;/DIV&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="java"&gt;java.lang.NoClassDefFoundError: org/apache/flink/api/common/serialization/DeserializationSchema&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;DIV&gt;&lt;SPAN&gt;-&amp;gt; The Scala scripts (running using SBT):&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;1&lt;/SPAN&gt;&lt;SPAN&gt;. SBT file (build.sbt) :&lt;/SPAN&gt;&lt;/DIV&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="java"&gt;scalaVersion := "2.12.18"
name := "kafka-flink"
organization := "com.me"
version := "1.0"
//libraryDependencies += "org.apache.kafka" % "kafka-clients" % "2.5.0.7.1.7.1000-141"
libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.14.0-csa1.6.2.0" % "provided"
libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" % "1.14.0-csa1.6.2.0" % "provided"
libraryDependencies += "org.apache.flink" %% "flink-connector-kafka" % "1.14.0-csa1.6.2.0" % "provided"
libraryDependencies += "org.apache.flink" % "flink-core" %  "1.13.2-csa1.5.3.2" % "provided"
libraryDependencies += "org.apache.flink" %% "flink-clients" % "1.14.0-csa1.6.2.0" % "provided"
resolvers += "Maven Central" at "https://repo1.maven.org/maven2"
resolvers += "Cloudera" at "https://repository.cloudera.com/artifactory/cloudera-repos/"&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;DIV&gt;&lt;SPAN&gt;2&lt;/SPAN&gt;&lt;SPAN&gt;. Main Object (Data stream transformation) :&lt;/SPAN&gt;&lt;/DIV&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="java"&gt;import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.kafka.clients.consumer.ConsumerConfig
import java.util.Properties

object MyFlinkJob {
  def main(args: Array[String]): Unit = {
// Define Kafka topics
val cdr_topic = "my_topic"

// Prepare the Flink environment
val env = StreamExecutionEnvironment.getExecutionEnvironment

// Prepare Kafka consumer properties
System.setProperty("java.security.auth.login.config", " ")
val properties: Properties = new Properties()
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, " ")
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, " ")
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000")

// Security settings
properties.setProperty("security.protocol", " ")
properties.setProperty("sasl.mechanism", " ")
properties.setProperty("sasl.kerberos.service.name", "kafka")
properties.setProperty("ssl.truststore.location", " ")
properties.setProperty("ssl.truststore.password", " ")

// Create Kafka consumer
val cdrConsumer = new FlinkKafkaConsumer[String](cdr_topic, new SimpleStringSchema(), properties)

// Create a data stream for incoming messages
val cdrStream: DataStream[String] = env.addSource(cdrConsumer)

// Process and print the messages
cdrStream.print()
// Execute the Flink job
env.execute("kafka_flink_Data_Stream_Processor")}}&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
    <pubDate>Wed, 08 Nov 2023 05:37:16 GMT</pubDate>
    <dc:creator>I_Mesh01</dc:creator>
    <dc:date>2023-11-08T05:37:16Z</dc:date>
    <item>
      <title>Kafka-Flink connection</title>
      <link>https://community.cloudera.com/t5/Support-Questions/Kafka-Flink-connection/m-p/378739#M243648</link>
      <description>&lt;DIV&gt;&lt;SPAN&gt;Hi team,&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;We faced an issue &lt;/SPAN&gt;&lt;SPAN&gt;while&lt;/SPAN&gt;&lt;SPAN&gt; working on Json transformation using Flink after consuming Kafka upcoming data stream. The script developed using Scala.&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;-&amp;gt; The services details are:&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;1&lt;/SPAN&gt;&lt;SPAN&gt;. Server Kafka-console-consumer version: &lt;/SPAN&gt;&lt;STRONG&gt;2.5.0.7.1.7.1000-141&lt;/STRONG&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;2&lt;/SPAN&gt;&lt;SPAN&gt;. Server Flink version: &lt;/SPAN&gt;&lt;STRONG&gt;14.14.0-csa1.6.2.0&lt;/STRONG&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;3&lt;/SPAN&gt;&lt;SPAN&gt;. Server Scala version: &lt;/SPAN&gt;&lt;STRONG&gt;12.13.10&lt;/STRONG&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;-&amp;gt; The error is:&lt;/SPAN&gt;&lt;/DIV&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="java"&gt;java.lang.NoClassDefFoundError: org/apache/flink/api/common/serialization/DeserializationSchema&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;DIV&gt;&lt;SPAN&gt;-&amp;gt; The Scala scripts (running using SBT):&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;1&lt;/SPAN&gt;&lt;SPAN&gt;. SBT file (build.sbt) :&lt;/SPAN&gt;&lt;/DIV&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="java"&gt;scalaVersion := "2.12.18"
name := "kafka-flink"
organization := "com.me"
version := "1.0"
//libraryDependencies += "org.apache.kafka" % "kafka-clients" % "2.5.0.7.1.7.1000-141"
libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.14.0-csa1.6.2.0" % "provided"
libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" % "1.14.0-csa1.6.2.0" % "provided"
libraryDependencies += "org.apache.flink" %% "flink-connector-kafka" % "1.14.0-csa1.6.2.0" % "provided"
libraryDependencies += "org.apache.flink" % "flink-core" %  "1.13.2-csa1.5.3.2" % "provided"
libraryDependencies += "org.apache.flink" %% "flink-clients" % "1.14.0-csa1.6.2.0" % "provided"
resolvers += "Maven Central" at "https://repo1.maven.org/maven2"
resolvers += "Cloudera" at "https://repository.cloudera.com/artifactory/cloudera-repos/"&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;DIV&gt;&lt;SPAN&gt;2&lt;/SPAN&gt;&lt;SPAN&gt;. Main Object (Data stream transformation) :&lt;/SPAN&gt;&lt;/DIV&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="java"&gt;import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.kafka.clients.consumer.ConsumerConfig
import java.util.Properties

object MyFlinkJob {
  def main(args: Array[String]): Unit = {
// Define Kafka topics
val cdr_topic = "my_topic"

// Prepare the Flink environment
val env = StreamExecutionEnvironment.getExecutionEnvironment

// Prepare Kafka consumer properties
System.setProperty("java.security.auth.login.config", " ")
val properties: Properties = new Properties()
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, " ")
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, " ")
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000")

// Security settings
properties.setProperty("security.protocol", " ")
properties.setProperty("sasl.mechanism", " ")
properties.setProperty("sasl.kerberos.service.name", "kafka")
properties.setProperty("ssl.truststore.location", " ")
properties.setProperty("ssl.truststore.password", " ")

// Create Kafka consumer
val cdrConsumer = new FlinkKafkaConsumer[String](cdr_topic, new SimpleStringSchema(), properties)

// Create a data stream for incoming messages
val cdrStream: DataStream[String] = env.addSource(cdrConsumer)

// Process and print the messages
cdrStream.print()
// Execute the Flink job
env.execute("kafka_flink_Data_Stream_Processor")}}&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Wed, 08 Nov 2023 05:37:16 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Support-Questions/Kafka-Flink-connection/m-p/378739#M243648</guid>
      <dc:creator>I_Mesh01</dc:creator>
      <dc:date>2023-11-08T05:37:16Z</dc:date>
    </item>
    <item>
      <title>Re: Kafka-Flink connection</title>
      <link>https://community.cloudera.com/t5/Support-Questions/Kafka-Flink-connection/m-p/380028#M243983</link>
      <description>&lt;P&gt;&lt;a href="https://community.cloudera.com/t5/user/viewprofilepage/user-id/106003"&gt;@I_Mesh01&lt;/a&gt;&amp;nbsp;This error is stating that your topic schema and flink schema do not match.&amp;nbsp; &amp;nbsp;I cant see much above related to the shape of the data or the schema, but this is what you should be looking at.&amp;nbsp; &amp;nbsp;Conflict in data/column types, missing data, etc, will result in the deserialization errors.&lt;/P&gt;</description>
      <pubDate>Fri, 01 Dec 2023 14:05:27 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Support-Questions/Kafka-Flink-connection/m-p/380028#M243983</guid>
      <dc:creator>steven-matison</dc:creator>
      <dc:date>2023-12-01T14:05:27Z</dc:date>
    </item>
  </channel>
</rss>

