Support Questions
Find answers, ask questions, and share your expertise

Can't submit Spark job after Kafka port change

Highlighted

Can't submit Spark job after Kafka port change

I have a Hortonwork's Hadoop cluster stack at version 2.5, this includes Kafka 0.10, of which I have two brokers running and listening `PLAINTEXT://localhost:6667`, however if I change the port to `PLAINTEXT://localhost:9092`, due to company port restrictions, I am unable to start Spark jobs I was able before (With port 6667), getting the following error:

16/10/19 17:02:00 INFO SimpleConsumer: Reconnect due to socket error: java.nio.channels.ClosedChannelException
Exception in thread "main" org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
        at scala.util.Either.fold(Either.scala:97)
        at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
        at org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData.restore(DirectKafkaInputDStream.scala:197)
        at org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:515)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:516)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:516)
        at scala.collection.immutable.List.foreach(List.scala:318)
        at org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:516)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:516)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:516)
        at scala.collection.immutable.List.foreach(List.scala:318)
        at org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:516)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:516)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:516)
        at scala.collection.immutable.List.foreach(List.scala:318)
        at org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:516)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$restoreCheckpointData$2.apply(DStreamGraph.scala:151)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$restoreCheckpointData$2.apply(DStreamGraph.scala:151)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.streaming.DStreamGraph.restoreCheckpointData(DStreamGraph.scala:151)
        at org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:158)
        at org.apache.spark.streaming.StreamingContext$$anonfun$getOrCreate$1.apply(StreamingContext.scala:877)
        at org.apache.spark.streaming.StreamingContext$$anonfun$getOrCreate$1.apply(StreamingContext.scala:877)
        at scala.Option.map(Option.scala:145)
        at org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:877)
        at org.apache.spark.streaming.api.java.JavaStreamingContext$.getOrCreate(JavaStreamingContext.scala:775)
        at org.apache.spark.streaming.api.java.JavaStreamingContext.getOrCreate(JavaStreamingContext.scala)
        at com.ncr.dataplatform.Runner.main(Runner.java:48)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
16/10/19 17:02:00 INFO SparkContext: Invoking stop() from shutdown hook


 

On all cluster nodes I have `iptables` and `selinux` disabled, and people are able to connect to Kafka new port from outside. I can telnet from any datanode that will run the spark job to kafka broker on both ports (6667 and 9092), also to Zookeeper on the port I have configured.

Any idea why this can happen? I can't obtain much more information from this error message and I ran out of ideas.

2 REPLIES 2
Highlighted

Re: Can't submit Spark job after Kafka port change

@Jose Luis Navarro Vicente, Can you check if you are able to connect via kafka cli's?

Highlighted

Re: Can't submit Spark job after Kafka port change

I am able to connect using kafka-console-producer from other machine and I am also able to connect using kafka-console-consumer. In fact, other machines can keep connecting and sending messages over topics, the only problem is when I try to submit a spark job, I have specified the port change for this spark job but it just fails.