Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Spark throwing classCastException when updateStateByKey is called

avatar
New Contributor

I am getting following errors when i execute jar via spark-submit --class ... location/to/jar ... 

As soon as updateStateByKey(..function..) is hit in the following code, it throws exception below:

JavaPairDStream<Integer, Long> responseCodeCountDStream = logObject
.transformToPair(MainApplication::responseCodeCount);
JavaPairDStream<Integer, Long> cumulativeResponseCodeCountDStream =
responseCodeCountDStream.updateStateByKey(COMPUTE_RUNNING_SUM);
cumulativeResponseCodeCountDStream.foreachRDD(rdd -> {
LOG.warn("Response code counts: " + rdd.take(100));
});

 

my pom looks like this

<dependencies>
<dependency> <!-- Spark -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
<version>1.6.2</version>
</dependency>
<dependency> <!-- Spark SQL -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>1.6.2</version>
</dependency>
<dependency> <!-- Spark Streaming -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.11</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.19</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.5</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.5</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>19.0</version>
</dependency>
</dependencies>

<build>


<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<shadedArtifactAttached>true</shadedArtifactAttached>
<shadedClassifierName>allinone</shadedClassifierName>

<artifactSet>
<includes>
<include>*:*</include>
</includes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>app.MainApplication</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>

</plugins>
</build>

 

 

16/11/03 10:26:20 WARN BlockManager: Putting block rdd_30_0 failed due to an exception
16/11/03 10:26:20 WARN BlockManager: Block rdd_30_0 could not be removed as it was not found on disk or in memory
16/11/03 10:26:20 ERROR Executor: Exception in task 0.0 in stage 22.0 (TID 20)
java.lang.ClassCastException: org.apache.spark.api.java.Optional cannot be cast to com.google.common.base.Optional
at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction2$1.apply(JavaPairRDD.scala:1025)
at org.apache.spark.streaming.api.java.JavaPairDStream$$anonfun$1.apply(JavaPairDStream.scala:473)
at org.apache.spark.streaming.api.java.JavaPairDStream$$anonfun$1.apply(JavaPairDStream.scala:470)
at org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$updateStateByKey$3$$anonfun$7$$anonfun$apply$2.apply(PairDStreamFunctions.scala:435)
at org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$updateStateByKey$3$$anonfun$7$$anonfun$apply$2.apply(PairDStreamFunctions.scala:435)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:360)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:951)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:926)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:670)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
16/11/03 10:26:20 WARN TaskSetManager: Lost task 0.0 in stage 22.0 (TID 20, localhost): java.lang.ClassCastException: org.apache.spark.api.java.Optional cannot be cast to com.google.common.base.Optional
at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction2$1.apply(JavaPairRDD.scala:1025)
at org.apache.spark.streaming.api.java.JavaPairDStream$$anonfun$1.apply(JavaPairDStream.scala:473)
at org.apache.spark.streaming.api.java.JavaPairDStream$$anonfun$1.apply(JavaPairDStream.scala:470)
at org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$updateStateByKey$3$$anonfun$7$$anonfun$apply$2.apply(PairDStreamFunctions.scala:435)
at org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$updateStateByKey$3$$anonfun$7$$anonfun$apply$2.apply(PairDStreamFunctions.scala:435)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:360)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:951)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:926)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:670)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

1 ACCEPTED SOLUTION

avatar
New Contributor

Solved by updating the version to latest

 

The latest version of spark uses Optional within the spark package. But older version uses Optional from google package. 

 

View solution in original post

1 REPLY 1

avatar
New Contributor

Solved by updating the version to latest

 

The latest version of spark uses Optional within the spark package. But older version uses Optional from google package.