Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Spark Streaming Stoped

Highlighted

Spark Streaming Stoped

New Contributor

We have Kafka Topic which use Avro with schema registry. We want to use this topic as source for Spark Streaming. So we use bellow codes:

schema_registry_client = CachedSchemaRegistryClient(url='URL')
serializer = MessageSerializer(schema_registry_client)
def decoder(s):
    decoded_message = serializer.decode_message(s)
    return decoded_message
kvs = KafkaUtils.createDirectStream(ssc, topic, {'bootstrap.servers': brokers}, valueDecoder=decoder)
lines = kvs.map(lambda x: x[1])

However when we try to execute this code, after few minutes (After printing few mini batches on terminal) it got stop.

Error:

java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:196) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:212) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21) at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:213) at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:407) at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:215) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1991) at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:170)

2018-10-15 14:49:00 ERROR TaskSetManager:70 - Task 0 in stage 50.0 failed 4 times; aborting job 2018-10-15 14:49:00 INFO YarnScheduler:54 - Removed TaskSet 50.0, whose tasks have all completed, from pool 2018-10-15 14:49:00 INFO YarnScheduler:54 - Cancelling stage 50 2018-10-15 14:49:00 INFO DAGScheduler:54 - ResultStage 50 (runJob at PythonRDD.scala:149) failed in 0.279 s due to Job aborted due to stage failure: Task 0 in stage 50.0 failed 4 times, most recent failure: Lost task 0.3 in stage 50.0 (TID 103, gen-pickme-slavea, executor 3): org.apache.spark.SparkException: Couldn't connect to leader for topic TOPIC NAME 2: java.nio.channels.ClosedChannelException java.nio.channels.ClosedChannelException java.nio.channels.ClosedChannelException java.nio.channels.ClosedChannelException java.nio.channels.ClosedChannelException at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$connectLeader$1.apply(KafkaRDD.scala:168) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$connectLeader$1.apply(KafkaRDD.scala:168) at scala.util.Either.fold(Either.scala:98) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.connectLeader(KafkaRDD.scala:167) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.<init>(KafkaRDD.scala:159) at org.apache.spark.streaming.kafka.KafkaRDD.compute(KafkaRDD.scala:136) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:64) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:64) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

Driver stacktrace: 2018-10-15 14:49:00 INFO DAGScheduler:54 - Job 50 failed: runJob at PythonRDD.scala:149, took 0.282889 s 2018-10-15 14:49:00 INFO JobScheduler:54 - Finished job streaming job 1539595140000 ms.0 from job set of time 1539595140000 ms 2018-10-15 14:49:00 ERROR JobScheduler:91 - Error running job streaming job 1539595140000 ms.0 org.apache.spark.SparkException: An exception was raised by Python: Traceback (most recent call last): File "/home/user/spark2/python/lib/pyspark.zip/pyspark/streaming/util.py", line 65, in call r = self.func(t, *rdds) File "/home/user/spark2/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 171, in takeAndPrint taken = rdd.take(num + 1) File "/home/user/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 1375, in take res = self.context.runJob(self, takeUpToNumLeft, p) File "/home/user/spark2/python/lib/pyspark.zip/pyspark/context.py", line 1013, in runJob sock_info = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions) File "/home/user/spark2/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/home/user/spark2/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name), value) Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.

What could be the reason?

Don't have an account?
Coming from Hortonworks? Activate your account here