Member since
07-14-2023
2
Posts
0
Kudos Received
0
Solutions
11-07-2023
09:37 PM
Hi team, We faced an issue while working on Json transformation using Flink after consuming Kafka upcoming data stream. The script developed using Scala. -> The services details are: 1. Server Kafka-console-consumer version: 2.5.0.7.1.7.1000-141 2. Server Flink version: 14.14.0-csa1.6.2.0 3. Server Scala version: 12.13.10 -> The error is: java.lang.NoClassDefFoundError: org/apache/flink/api/common/serialization/DeserializationSchema -> The Scala scripts (running using SBT): 1. SBT file (build.sbt) : scalaVersion := "2.12.18"
name := "kafka-flink"
organization := "com.me"
version := "1.0"
//libraryDependencies += "org.apache.kafka" % "kafka-clients" % "2.5.0.7.1.7.1000-141"
libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.14.0-csa1.6.2.0" % "provided"
libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" % "1.14.0-csa1.6.2.0" % "provided"
libraryDependencies += "org.apache.flink" %% "flink-connector-kafka" % "1.14.0-csa1.6.2.0" % "provided"
libraryDependencies += "org.apache.flink" % "flink-core" % "1.13.2-csa1.5.3.2" % "provided"
libraryDependencies += "org.apache.flink" %% "flink-clients" % "1.14.0-csa1.6.2.0" % "provided"
resolvers += "Maven Central" at "https://repo1.maven.org/maven2"
resolvers += "Cloudera" at "https://repository.cloudera.com/artifactory/cloudera-repos/" 2. Main Object (Data stream transformation) : import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.kafka.clients.consumer.ConsumerConfig
import java.util.Properties
object MyFlinkJob {
def main(args: Array[String]): Unit = {
// Define Kafka topics
val cdr_topic = "my_topic"
// Prepare the Flink environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Prepare Kafka consumer properties
System.setProperty("java.security.auth.login.config", " ")
val properties: Properties = new Properties()
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, " ")
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, " ")
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000")
// Security settings
properties.setProperty("security.protocol", " ")
properties.setProperty("sasl.mechanism", " ")
properties.setProperty("sasl.kerberos.service.name", "kafka")
properties.setProperty("ssl.truststore.location", " ")
properties.setProperty("ssl.truststore.password", " ")
// Create Kafka consumer
val cdrConsumer = new FlinkKafkaConsumer[String](cdr_topic, new SimpleStringSchema(), properties)
// Create a data stream for incoming messages
val cdrStream: DataStream[String] = env.addSource(cdrConsumer)
// Process and print the messages
cdrStream.print()
// Execute the Flink job
env.execute("kafka_flink_Data_Stream_Processor")}}
... View more
Labels:
- Labels:
-
Apache Flink
-
Apache Kafka
07-14-2023
07:48 AM
Hello all! Is there a way to use PGR and input/output ports in the same nifi instance? The aim of that is to reuse the same processor multiple times by only reference to one PGR. If not, are there any other suggestions to achieve the same idea؟
... View more
Labels:
- Labels:
-
Apache NiFi