Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Spark 1.6.0 in Scala on YARN with Kafka, Spark Streaming Error on Submit (yarn-client)

avatar
Master Guru

spark-submit --packages org.apache.spark:spark-streaming-kafka_2.10:1.6.0 --class TwitterSentimentAnalysis --master yarn-client sentimentstreaming.jar --verbose

16/08/17 17:04:54 ERROR ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - java.lang.NoSuchMethodException: scala.runtime.Nothing$.<init>(kafka.utils.VerifiableProperties)
       at java.lang.Class.getConstructor0(Class.java:3082)
       at java.lang.Class.getConstructor(Class.java:1825)
       at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:103)
       at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148)
       at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130)
       at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$anonfun$9.apply(ReceiverTracker.scala:575)
       at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$anonfun$9.apply(ReceiverTracker.scala:565)
       at org.apache.spark.SparkContext$anonfun$37.apply(SparkContext.scala:1992)
       at org.apache.spark.SparkContext$anonfun$37.apply(SparkContext.scala:1992)
       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
       at org.apache.spark.scheduler.Task.run(Task.scala:89)
       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
       at java.lang.Thread.run(Thread.java:745)

This is on submit:

:05 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down.
[root@sandbox spark]#
[root@sandbox spark]# vi submitstreaming.sh
[root@sandbox spark]# ./submitstreaming.sh
Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
:: loading settings :: url = jar:file:/usr/hdp/2.4.0.0-169/spark/lib/spark-assembly-1.6.0.2.4.0.0-169-hadoop2.7.1.2.4.0.0-169.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.spark#spark-streaming-kafka_2.10 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
       	confs: [default]
       	found org.apache.spark#spark-streaming-kafka_2.10;1.6.0 in central
       	found org.apache.kafka#kafka_2.10;0.8.2.1 in central
       	found com.yammer.metrics#metrics-core;2.2.0 in central
       	found org.slf4j#slf4j-api;1.7.10 in central
       	found org.apache.kafka#kafka-clients;0.8.2.1 in central
       	found net.jpountz.lz4#lz4;1.3.0 in central
       	found org.xerial.snappy#snappy-java;1.1.2 in central
       	found com.101tec#zkclient;0.3 in central
       	found log4j#log4j;1.2.17 in central
       	found org.spark-project.spark#unused;1.0.0 in central
downloading https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-kafka_2.10/1.6.0/spark-streaming-kaf... ...
       	[SUCCESSFUL ] org.apache.spark#spark-streaming-kafka_2.10;1.6.0!spark-streaming-kafka_2.10.jar (146ms)
downloading https://repo1.maven.org/maven2/org/apache/kafka/kafka_2.10/0.8.2.1/kafka_2.10-0.8.2.1.jar ...
       	[SUCCESSFUL ] org.apache.kafka#kafka_2.10;0.8.2.1!kafka_2.10.jar (1068ms)
downloading https://repo1.maven.org/maven2/org/spark-project/spark/unused/1.0.0/unused-1.0.0.jar ...
       	[SUCCESSFUL ] org.spark-project.spark#unused;1.0.0!unused.jar (21ms)
downloading https://repo1.maven.org/maven2/com/yammer/metrics/metrics-core/2.2.0/metrics-core-2.2.0.jar ...
       	[SUCCESSFUL ] com.yammer.metrics#metrics-core;2.2.0!metrics-core.jar (33ms)
downloading https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/0.8.2.1/kafka-clients-0.8.2.1.jar ...
       	[SUCCESSFUL ] org.apache.kafka#kafka-clients;0.8.2.1!kafka-clients.jar (106ms)
downloading https://repo1.maven.org/maven2/com/101tec/zkclient/0.3/zkclient-0.3.jar ...
       	[SUCCESSFUL ] com.101tec#zkclient;0.3!zkclient.jar (31ms)
downloading https://repo1.maven.org/maven2/org/slf4j/slf4j-api/1.7.10/slf4j-api-1.7.10.jar ...
       	[SUCCESSFUL ] org.slf4j#slf4j-api;1.7.10!slf4j-api.jar (26ms)
downloading https://repo1.maven.org/maven2/net/jpountz/lz4/lz4/1.3.0/lz4-1.3.0.jar ...
       	[SUCCESSFUL ] net.jpountz.lz4#lz4;1.3.0!lz4.jar (66ms)
downloading https://repo1.maven.org/maven2/org/xerial/snappy/snappy-java/1.1.2/snappy-java-1.1.2.jar ...
       	[SUCCESSFUL ] org.xerial.snappy#snappy-java;1.1.2!snappy-java.jar(bundle) (166ms)
downloading https://repo1.maven.org/maven2/log4j/log4j/1.2.17/log4j-1.2.17.jar ...
       	[SUCCESSFUL ] log4j#log4j;1.2.17!log4j.jar(bundle) (117ms)
:: resolution report :: resolve 3263ms :: artifacts dl 1790ms
       	:: modules in use:
       	com.101tec#zkclient;0.3 from central in [default]
       	com.yammer.metrics#metrics-core;2.2.0 from central in [default]
       	log4j#log4j;1.2.17 from central in [default]
       	net.jpountz.lz4#lz4;1.3.0 from central in [default]
       	org.apache.kafka#kafka-clients;0.8.2.1 from central in [default]
       	org.apache.kafka#kafka_2.10;0.8.2.1 from central in [default]
       	org.apache.spark#spark-streaming-kafka_2.10;1.6.0 from central in [default]
       	org.slf4j#slf4j-api;1.7.10 from central in [default]
       	org.spark-project.spark#unused;1.0.0 from central in [default]
       	org.xerial.snappy#snappy-java;1.1.2 from central in [default]
       	---------------------------------------------------------------------
       	|                  |            modules            ||   artifacts   |
       	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
       	---------------------------------------------------------------------
       	|      default     |   10  |   10  |   10  |   0   ||   10  |   10  |
       	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent
       	confs: [default]
       	10 artifacts copied, 0 already retrieved (6067kB/20ms)
16/08/17 17:04:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/08/17 17:04:29 INFO Slf4jLogger: Slf4jLogger started
16/08/17 17:04:29 INFO Remoting: Starting remoting
16/08/17 17:04:29 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@10.0.2.15:42397]
16/08/17 17:04:30 INFO Server: jetty-8.y.z-SNAPSHOT
16/08/17 17:04:30 INFO AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
16/08/17 17:04:30 INFO Server: jetty-8.y.z-SNAPSHOT
16/08/17 17:04:30 INFO AbstractConnector: Started SocketConnector@0.0.0.0:38910
spark.yarn.driver.memoryOverhead is set but does not apply in client mode.
16/08/17 17:04:31 INFO TimelineClientImpl: Timeline service address: http://sandbox.hortonworks.com:8188/ws/v1/timeline/
16/08/17 17:04:32 INFO RMProxy: Connecting to ResourceManager at sandbox.hortonworks.com/10.0.2.15:8050
16/08/17 17:04:32 WARN DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
16/08/17 17:04:33 INFO YarnClientImpl: Submitted application application_1471437048143_0007
16/08/17 17:04:44 INFO TwitterSentimentAnalysis: Using Twitter topic
16/08/17 17:04:52 ERROR ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - java.lang.NoSuchMethodException: scala.runtime.Nothing$.<init>(kafka.utils.VerifiableProperties)
       	at java.lang.Class.getConstructor0(Class.java:3082)
       	at java.lang.Class.getConstructor(Class.java:1825)
       	at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:103)
       	at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148)
       	at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130)
       	at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:575)
       	at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:565)
       	at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1992)
       	at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1992)
       	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
       	at org.apache.spark.scheduler.Task.run(Task.scala:89)
       	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
       	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
       	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
       	at java.lang.Thread.run(Thread.java:745)


 

Scala Source Snippit

val topicMaps = Map("tweets" -> 1)

logger.info("Using Twitter topic")

// Create a new stream for JSON String
val tweets = KafkaUtils.createStream(ssc, kafkaConf, topicMaps, StorageLevel.MEMORY_ONLY_SER)

try {
  tweets.foreachRDD((rdd, time) => {
1 ACCEPTED SOLUTION

avatar
Master Guru

I was missing the StringDecoder, make sure you have those!!!

val tweets = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaConf, topicMaps, StorageLevel.MEMORY_ONLY_SER)

View solution in original post

5 REPLIES 5

avatar
Master Guru

May be a coding error, left out the StringDecoder.

avatar
Master Guru

Caused by: java.sql.SQLException: Exception during creation of file /tmp/spark-f4651a14-f0f5-422a-908a-1fb812c862ed/metastore/seg0/c2f0.dat for container at org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown Source) at org.apache.derby.impl.jdbc.SQLExceptionFactory40.wrapArgsForTransportAcrossDRDA(Unknown Source) at org.apache.derby.impl.jdbc.SQLExceptionFactory40.getSQLException(Unknown Source) at org.apache.derby.impl.jdbc.Util.newEmbedSQLException(Unknown Source) at org.apache.derby.impl.jdbc.Util.seeNextException(Unknown Source) at org.apache.derby.impl.jdbc.TransactionResourceImpl.wrapInSQLException(Unknown Source) ... 97 more Caused by: java.sql.SQLException: Java exception: '/tmp/spark-f4651a14-f0f5-422a-908a-1fb812c862ed/metastore/seg0/c2f0.dat (Too many open files): java.io.FileNotFoundException'. at org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown Source) at org.apache.derby.impl.jdbc.SQLExceptionFactory40.wrapArgsForTransportAcrossDRDA(Unknown Source) at org.apache.derby.impl.jdbc.SQLExceptionFactory40.getSQLException(Unknown Source) at org.apache.derby.impl.jdbc.Util.newEmbedSQLException(Unknown Source) at org.apache.derby.impl.jdbc.Util.javaException(Unknown Source) at org.apache.derby.impl.jdbc.TransactionResourceImpl.wrapInSQLException(Unknown Source) ... 98 more Caused by: java.io.FileNotFoundException: /tmp/spark-f4651a14-f0f5-422a-908a-1fb812c862ed/metastore/seg0/c2f0.dat (Too many open files) at java.io.RandomAccessFile.open0(Native Method) at java.io.RandomAccessFile.open(RandomAccessFile.java:316) at java.io.RandomAccessFile.<init>(RandomAccessFile.java:243) at org.apache.derby.impl.io.DirRandomAccessFile.<init>(Unknown Source) at org.apache.derby.impl.io.DirFile4.getRandomAccessFile(Unknown Source) at org.apache.derby.impl.store.raw.data.RAFContainer.run(Unknown Source) at java.security.AccessController.doPrivileged(Native Method) at org.apache.derby.impl.store.raw.data.RAFContainer.createContainer(Unknown Source) at org.apache.derby.impl.store.raw.data.RAFContainer4.createContainer(Unknown Source) at org.apache.derby.impl.store.raw.data.FileContainer.createIdent(Unknown Source) at org.apache.derby.impl.store.raw.data.FileContainer.createIdentity(Unknown Source) at org.apache.derby.impl.services.cache.ConcurrentCache.create(Unknown Source)

avatar
Super Collaborator

I noticed "Too many open files" so you might want to check your ulimit setting. Check here for guidance: https://community.hortonworks.com/questions/2029/best-practices-for-ulimits-number-of-open-file-dec....

avatar
Master Guru

that was my second problem after fixing the stringdecoder, so I upped my ulimit and that removed that second issue. Thanks!

avatar
Master Guru

I was missing the StringDecoder, make sure you have those!!!

val tweets = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaConf, topicMaps, StorageLevel.MEMORY_ONLY_SER)