Created 10-19-2016 03:06 PM
I have a Hortonwork's Hadoop cluster stack at version 2.5, this includes Kafka 0.10, of which I have two brokers running and listening `PLAINTEXT://localhost:6667`, however if I change the port to `PLAINTEXT://localhost:9092`, due to company port restrictions, I am unable to start Spark jobs I was able before (With port 6667), getting the following error:
16/10/19 17:02:00 INFO SimpleConsumer: Reconnect due to socket error: java.nio.channels.ClosedChannelException Exception in thread "main" org.apache.spark.SparkException: java.nio.channels.ClosedChannelException at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) at scala.util.Either.fold(Either.scala:97) at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData.restore(DirectKafkaInputDStream.scala:197) at org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:515) at org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:516) at org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:516) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:516) at org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:516) at org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:516) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:516) at org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:516) at org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:516) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:516) at org.apache.spark.streaming.DStreamGraph$$anonfun$restoreCheckpointData$2.apply(DStreamGraph.scala:151) at org.apache.spark.streaming.DStreamGraph$$anonfun$restoreCheckpointData$2.apply(DStreamGraph.scala:151) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.streaming.DStreamGraph.restoreCheckpointData(DStreamGraph.scala:151) at org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:158) at org.apache.spark.streaming.StreamingContext$$anonfun$getOrCreate$1.apply(StreamingContext.scala:877) at org.apache.spark.streaming.StreamingContext$$anonfun$getOrCreate$1.apply(StreamingContext.scala:877) at scala.Option.map(Option.scala:145) at org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:877) at org.apache.spark.streaming.api.java.JavaStreamingContext$.getOrCreate(JavaStreamingContext.scala:775) at org.apache.spark.streaming.api.java.JavaStreamingContext.getOrCreate(JavaStreamingContext.scala) at com.ncr.dataplatform.Runner.main(Runner.java:48) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 16/10/19 17:02:00 INFO SparkContext: Invoking stop() from shutdown hook
On all cluster nodes I have `iptables` and `selinux` disabled, and people are able to connect to Kafka new port from outside. I can telnet from any datanode that will run the spark job to kafka broker on both ports (6667 and 9092), also to Zookeeper on the port I have configured.
Any idea why this can happen? I can't obtain much more information from this error message and I ran out of ideas.
Created 10-19-2016 04:28 PM
@Jose Luis Navarro Vicente, Can you check if you are able to connect via kafka cli's?
Created 10-20-2016 08:00 AM
I am able to connect using kafka-console-producer from other machine and I am also able to connect using kafka-console-consumer. In fact, other machines can keep connecting and sending messages over topics, the only problem is when I try to submit a spark job, I have specified the port change for this spark job but it just fails.