Created 12-18-2017 08:44 AM
0favorite | I am writing a Flink Kafka integration program as below but getting timeout error for kafka : <code>import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer010, FlinkKafkaProducer010} import org.apache.flink.streaming.util.serialization.SimpleStringSchema import java.util.Properties object StreamKafkaProducer { def main(args: Array[String]) { val env = StreamExecutionEnvironment.getExecutionEnvironment val properties = new Properties() properties.setProperty("bootstrap.servers", "localhost:9092") properties.setProperty("zookeeper.connect", "localhost:2181") properties.setProperty("serializer.class", "kafka.serializer.StringEncoder") val stream: DataStream[String] =env.fromElements( ("Adam"), ("Sarah")) val kafkaProducer = new FlinkKafkaProducer010[String]( "localhost:9092", "output", new SimpleStringSchema ) // write data into Kafka stream.addSink(kafkaProducer) env.execute("Flink kafka integration ") } } From terminal I can see kafka and zookeeper are running but when I run above program from Intellij it is showing this error : <code>C:\Users\amdass\workspace\flink-project-master>sbt run Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=256m; support was removed in 8.0 [info] Loading project definition from C:\Users\amdass\workspace\flink- project-master\project [info] Set current project to Flink Project (in build file:/C:/Users/amdass/workspace/flink-project-master/) [info] Compiling 1 Scala source to C:\Users\amdass\workspace\flink-project- master\target\scala-2.11\classes... [info] Running org.example.StreamKafkaProducer SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. Connected to JobManager at Actor[akka://flink/user/jobmanager_1#-563113020] with leader session id 5a637740-5c73-4f69-a19e-c8ef7141efa1. 12/15/2017 14:41:49 Job execution switched to status RUNNING. 12/15/2017 14:41:49 Source: Collection Source(1/1) switched to SCHEDULED 12/15/2017 14:41:49 Sink: Unnamed(1/4) switched to SCHEDULED 12/15/2017 14:41:49 Sink: Unnamed(2/4) switched to SCHEDULED 12/15/2017 14:41:49 Sink: Unnamed(3/4) switched to SCHEDULED 12/15/2017 14:41:49 Sink: Unnamed(4/4) switched to SCHEDULED 12/15/2017 14:41:49 Source: Collection Source(1/1) switched to DEPLOYING 12/15/2017 14:41:49 Sink: Unnamed(1/4) switched to DEPLOYING 12/15/2017 14:41:49 Sink: Unnamed(2/4) switched to DEPLOYING 12/15/2017 14:41:49 Sink: Unnamed(3/4) switched to DEPLOYING 12/15/2017 14:41:49 Sink: Unnamed(4/4) switched to DEPLOYING 12/15/2017 14:41:50 Source: Collection Source(1/1) switched to RUNNING 12/15/2017 14:41:50 Sink: Unnamed(2/4) switched to RUNNING 12/15/2017 14:41:50 Sink: Unnamed(4/4) switched to RUNNING 12/15/2017 14:41:50 Sink: Unnamed(3/4) switched to RUNNING 12/15/2017 14:41:50 Sink: Unnamed(1/4) switched to RUNNING 12/15/2017 14:41:50 Source: Collection Source(1/1) switched to FINISHED 12/15/2017 14:41:50 Sink: Unnamed(3/4) switched to FINISHED 12/15/2017 14:41:50 Sink: Unnamed(4/4) switched to FINISHED 12/15/2017 14:42:50 Sink: Unnamed(1/4) switched to FAILED <b> org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms. </b> 12/15/2017 14:42:50 Sink: Unnamed(2/4) switched to FAILED org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms. 12/15/2017 14:42:50 Job execution switched to status FAILING. org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms. 12/15/2017 14:42:50 Job execution switched to status FAILED. [error] (run-main-0) org.apache.flink.runtime.client.JobExecutionException: Job execution failed. org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:933) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:876) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:876) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107 ) Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms. [trace] Stack trace suppressed: run last *:run for the full output. java.lang.RuntimeException: Nonzero exit code: 1 at scala.sys.package$.error(package.scala:27) [trace] Stack trace suppressed: run last compile:run for the full output. [error] (compile:run) Nonzero exit code: 1 [error] Total time: 75 s, completed Dec 15, 2017 2:42:51 PM |
Created 12-18-2017 11:47 AM
Issue got resolved .
Follow this checklists --
1. Check Zookeeper running .
2. Check Kafka Producer and Consumer running fine on console, create one topic and list it this is to ensure that kafka running fine .
3. Similar version use in sbt
like for Kafka 0.9 below should be use :
org.apache.flink" %% "flink-connector-kafka-0.9" % flinkVersion % "provided"
and import in scala program : import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaProducer09, FlinkKafkaConsumer09}
For Kafka 0.10
org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion % "provided" And Import in scala program : import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaProducer10, FlinkKafkaConsumer10}
Created 12-18-2017 08:46 AM
@Fabian Hueske Please guide here
Created 12-18-2017 11:47 AM
Issue got resolved .
Follow this checklists --
1. Check Zookeeper running .
2. Check Kafka Producer and Consumer running fine on console, create one topic and list it this is to ensure that kafka running fine .
3. Similar version use in sbt
like for Kafka 0.9 below should be use :
org.apache.flink" %% "flink-connector-kafka-0.9" % flinkVersion % "provided"
and import in scala program : import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaProducer09, FlinkKafkaConsumer09}
For Kafka 0.10
org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion % "provided" And Import in scala program : import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaProducer10, FlinkKafkaConsumer10}