Support Questions
Find answers, ask questions, and share your expertise

Spark 1.6.0 in Scala on YARN with Kafka, Spark Streaming Error on Submit (yarn-client)

Super Guru

spark-submit --packages org.apache.spark:spark-streaming-kafka_2.10:1.6.0 --class TwitterSentimentAnalysis --master yarn-client sentimentstreaming.jar --verbose

16/08/17 17:04:54 ERROR ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - java.lang.NoSuchMethodException: scala.runtime.Nothing$.<init>(kafka.utils.VerifiableProperties)
       at java.lang.Class.getConstructor0(Class.java:3082)
       at java.lang.Class.getConstructor(Class.java:1825)
       at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:103)
       at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148)
       at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130)
       at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$anonfun$9.apply(ReceiverTracker.scala:575)
       at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$anonfun$9.apply(ReceiverTracker.scala:565)
       at org.apache.spark.SparkContext$anonfun$37.apply(SparkContext.scala:1992)
       at org.apache.spark.SparkContext$anonfun$37.apply(SparkContext.scala:1992)
       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
       at org.apache.spark.scheduler.Task.run(Task.scala:89)
       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
       at java.lang.Thread.run(Thread.java:745)

This is on submit:

:05 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down.
[root@sandbox spark]#
[root@sandbox spark]# vi submitstreaming.sh
[root@sandbox spark]# ./submitstreaming.sh
Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
:: loading settings :: url = jar:file:/usr/hdp/2.4.0.0-169/spark/lib/spark-assembly-1.6.0.2.4.0.0-169-hadoop2.7.1.2.4.0.0-169.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.spark#spark-streaming-kafka_2.10 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
       	confs: [default]
       	found org.apache.spark#spark-streaming-kafka_2.10;1.6.0 in central
       	found org.apache.kafka#kafka_2.10;0.8.2.1 in central
       	found com.yammer.metrics#metrics-core;2.2.0 in central
       	found org.slf4j#slf4j-api;1.7.10 in central
       	found org.apache.kafka#kafka-clients;0.8.2.1 in central
       	found net.jpountz.lz4#lz4;1.3.0 in central
       	found org.xerial.snappy#snappy-java;1.1.2 in central
       	found com.101tec#zkclient;0.3 in central
       	found log4j#log4j;1.2.17 in central
       	found org.spark-project.spark#unused;1.0.0 in central
downloading https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-kafka_2.10/1.6.0/spark-streaming-kaf... ...
       	[SUCCESSFUL ] org.apache.spark#spark-streaming-kafka_2.10;1.6.0!spark-streaming-kafka_2.10.jar (146ms)
downloading https://repo1.maven.org/maven2/org/apache/kafka/kafka_2.10/0.8.2.1/kafka_2.10-0.8.2.1.jar ...
       	[SUCCESSFUL ] org.apache.kafka#kafka_2.10;0.8.2.1!kafka_2.10.jar (1068ms)
downloading https://repo1.maven.org/maven2/org/spark-project/spark/unused/1.0.0/unused-1.0.0.jar ...
       	[SUCCESSFUL ] org.spark-project.spark#unused;1.0.0!unused.jar (21ms)
downloading https://repo1.maven.org/maven2/com/yammer/metrics/metrics-core/2.2.0/metrics-core-2.2.0.jar ...
       	[SUCCESSFUL ] com.yammer.metrics#metrics-core;2.2.0!metrics-core.jar (33ms)
downloading https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/0.8.2.1/kafka-clients-0.8.2.1.jar ...
       	[SUCCESSFUL ] org.apache.kafka#kafka-clients;0.8.2.1!kafka-clients.jar (106ms)
downloading https://repo1.maven.org/maven2/com/101tec/zkclient/0.3/zkclient-0.3.jar ...
       	[SUCCESSFUL ] com.101tec#zkclient;0.3!zkclient.jar (31ms)
downloading https://repo1.maven.org/maven2/org/slf4j/slf4j-api/1.7.10/slf4j-api-1.7.10.jar ...
       	[SUCCESSFUL ] org.slf4j#slf4j-api;1.7.10!slf4j-api.jar (26ms)
downloading https://repo1.maven.org/maven2/net/jpountz/lz4/lz4/1.3.0/lz4-1.3.0.jar ...
       	[SUCCESSFUL ] net.jpountz.lz4#lz4;1.3.0!lz4.jar (66ms)
downloading https://repo1.maven.org/maven2/org/xerial/snappy/snappy-java/1.1.2/snappy-java-1.1.2.jar ...
       	[SUCCESSFUL ] org.xerial.snappy#snappy-java;1.1.2!snappy-java.jar(bundle) (166ms)
downloading https://repo1.maven.org/maven2/log4j/log4j/1.2.17/log4j-1.2.17.jar ...
       	[SUCCESSFUL ] log4j#log4j;1.2.17!log4j.jar(bundle) (117ms)
:: resolution report :: resolve 3263ms :: artifacts dl 1790ms
       	:: modules in use:
       	com.101tec#zkclient;0.3 from central in [default]
       	com.yammer.metrics#metrics-core;2.2.0 from central in [default]
       	log4j#log4j;1.2.17 from central in [default]
       	net.jpountz.lz4#lz4;1.3.0 from central in [default]
       	org.apache.kafka#kafka-clients;0.8.2.1 from central in [default]
       	org.apache.kafka#kafka_2.10;0.8.2.1 from central in [default]
       	org.apache.spark#spark-streaming-kafka_2.10;1.6.0 from central in [default]
       	org.slf4j#slf4j-api;1.7.10 from central in [default]
       	org.spark-project.spark#unused;1.0.0 from central in [default]
       	org.xerial.snappy#snappy-java;1.1.2 from central in [default]
       	---------------------------------------------------------------------
       	|                  |            modules            ||   artifacts   |
       	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
       	---------------------------------------------------------------------
       	|      default     |   10  |   10  |   10  |   0   ||   10  |   10  |
       	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent
       	confs: [default]
       	10 artifacts copied, 0 already retrieved (6067kB/20ms)
16/08/17 17:04:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/08/17 17:04:29 INFO Slf4jLogger: Slf4jLogger started
16/08/17 17:04:29 INFO Remoting: Starting remoting
16/08/17 17:04:29 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@10.0.2.15:42397]
16/08/17 17:04:30 INFO Server: jetty-8.y.z-SNAPSHOT
16/08/17 17:04:30 INFO AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
16/08/17 17:04:30 INFO Server: jetty-8.y.z-SNAPSHOT
16/08/17 17:04:30 INFO AbstractConnector: Started SocketConnector@0.0.0.0:38910
spark.yarn.driver.memoryOverhead is set but does not apply in client mode.
16/08/17 17:04:31 INFO TimelineClientImpl: Timeline service address: http://sandbox.hortonworks.com:8188/ws/v1/timeline/
16/08/17 17:04:32 INFO RMProxy: Connecting to ResourceManager at sandbox.hortonworks.com/10.0.2.15:8050
16/08/17 17:04:32 WARN DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
16/08/17 17:04:33 INFO YarnClientImpl: Submitted application application_1471437048143_0007
16/08/17 17:04:44 INFO TwitterSentimentAnalysis: Using Twitter topic
16/08/17 17:04:52 ERROR ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - java.lang.NoSuchMethodException: scala.runtime.Nothing$.<init>(kafka.utils.VerifiableProperties)
       	at java.lang.Class.getConstructor0(Class.java:3082)
       	at java.lang.Class.getConstructor(Class.java:1825)
       	at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:103)
       	at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148)
       	at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130)
       	at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:575)
       	at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:565)
       	at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1992)
       	at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1992)
       	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
       	at org.apache.spark.scheduler.Task.run(Task.scala:89)
       	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
       	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
       	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
       	at java.lang.Thread.run(Thread.java:745)


 

Scala Source Snippit

val topicMaps = Map("tweets" -> 1)

logger.info("Using Twitter topic")

// Create a new stream for JSON String
val tweets = KafkaUtils.createStream(ssc, kafkaConf, topicMaps, StorageLevel.MEMORY_ONLY_SER)

try {
  tweets.foreachRDD((rdd, time) => {
1 ACCEPTED SOLUTION

Super Guru

I was missing the StringDecoder, make sure you have those!!!

val tweets = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaConf, topicMaps, StorageLevel.MEMORY_ONLY_SER)

View solution in original post

5 REPLIES 5

Super Guru

May be a coding error, left out the StringDecoder.

Super Guru

Caused by: java.sql.SQLException: Exception during creation of file /tmp/spark-f4651a14-f0f5-422a-908a-1fb812c862ed/metastore/seg0/c2f0.dat for container at org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown Source) at org.apache.derby.impl.jdbc.SQLExceptionFactory40.wrapArgsForTransportAcrossDRDA(Unknown Source) at org.apache.derby.impl.jdbc.SQLExceptionFactory40.getSQLException(Unknown Source) at org.apache.derby.impl.jdbc.Util.newEmbedSQLException(Unknown Source) at org.apache.derby.impl.jdbc.Util.seeNextException(Unknown Source) at org.apache.derby.impl.jdbc.TransactionResourceImpl.wrapInSQLException(Unknown Source) ... 97 more Caused by: java.sql.SQLException: Java exception: '/tmp/spark-f4651a14-f0f5-422a-908a-1fb812c862ed/metastore/seg0/c2f0.dat (Too many open files): java.io.FileNotFoundException'. at org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown Source) at org.apache.derby.impl.jdbc.SQLExceptionFactory40.wrapArgsForTransportAcrossDRDA(Unknown Source) at org.apache.derby.impl.jdbc.SQLExceptionFactory40.getSQLException(Unknown Source) at org.apache.derby.impl.jdbc.Util.newEmbedSQLException(Unknown Source) at org.apache.derby.impl.jdbc.Util.javaException(Unknown Source) at org.apache.derby.impl.jdbc.TransactionResourceImpl.wrapInSQLException(Unknown Source) ... 98 more Caused by: java.io.FileNotFoundException: /tmp/spark-f4651a14-f0f5-422a-908a-1fb812c862ed/metastore/seg0/c2f0.dat (Too many open files) at java.io.RandomAccessFile.open0(Native Method) at java.io.RandomAccessFile.open(RandomAccessFile.java:316) at java.io.RandomAccessFile.<init>(RandomAccessFile.java:243) at org.apache.derby.impl.io.DirRandomAccessFile.<init>(Unknown Source) at org.apache.derby.impl.io.DirFile4.getRandomAccessFile(Unknown Source) at org.apache.derby.impl.store.raw.data.RAFContainer.run(Unknown Source) at java.security.AccessController.doPrivileged(Native Method) at org.apache.derby.impl.store.raw.data.RAFContainer.createContainer(Unknown Source) at org.apache.derby.impl.store.raw.data.RAFContainer4.createContainer(Unknown Source) at org.apache.derby.impl.store.raw.data.FileContainer.createIdent(Unknown Source) at org.apache.derby.impl.store.raw.data.FileContainer.createIdentity(Unknown Source) at org.apache.derby.impl.services.cache.ConcurrentCache.create(Unknown Source)

Expert Contributor

I noticed "Too many open files" so you might want to check your ulimit setting. Check here for guidance: https://community.hortonworks.com/questions/2029/best-practices-for-ulimits-number-of-open-file-dec....

Super Guru

that was my second problem after fixing the stringdecoder, so I upped my ulimit and that removed that second issue. Thanks!

Super Guru

I was missing the StringDecoder, make sure you have those!!!

val tweets = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaConf, topicMaps, StorageLevel.MEMORY_ONLY_SER)

; ;