Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Spark throwing classCastException when updateStateByKey is called

SOLVED Go to solution

Spark throwing classCastException when updateStateByKey is called

New Contributor

I am getting following errors when i execute jar via spark-submit --class ... location/to/jar ... 

As soon as updateStateByKey(..function..) is hit in the following code, it throws exception below:

JavaPairDStream<Integer, Long> responseCodeCountDStream = logObject
.transformToPair(MainApplication::responseCodeCount);
JavaPairDStream<Integer, Long> cumulativeResponseCodeCountDStream =
responseCodeCountDStream.updateStateByKey(COMPUTE_RUNNING_SUM);
cumulativeResponseCodeCountDStream.foreachRDD(rdd -> {
LOG.warn("Response code counts: " + rdd.take(100));
});

 

my pom looks like this

<dependencies>
<dependency> <!-- Spark -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
<version>1.6.2</version>
</dependency>
<dependency> <!-- Spark SQL -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>1.6.2</version>
</dependency>
<dependency> <!-- Spark Streaming -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.11</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.19</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.5</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.5</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>19.0</version>
</dependency>
</dependencies>

<build>


<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<shadedArtifactAttached>true</shadedArtifactAttached>
<shadedClassifierName>allinone</shadedClassifierName>

<artifactSet>
<includes>
<include>*:*</include>
</includes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>app.MainApplication</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>

</plugins>
</build>

 

 

16/11/03 10:26:20 WARN BlockManager: Putting block rdd_30_0 failed due to an exception
16/11/03 10:26:20 WARN BlockManager: Block rdd_30_0 could not be removed as it was not found on disk or in memory
16/11/03 10:26:20 ERROR Executor: Exception in task 0.0 in stage 22.0 (TID 20)
java.lang.ClassCastException: org.apache.spark.api.java.Optional cannot be cast to com.google.common.base.Optional
at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction2$1.apply(JavaPairRDD.scala:1025)
at org.apache.spark.streaming.api.java.JavaPairDStream$$anonfun$1.apply(JavaPairDStream.scala:473)
at org.apache.spark.streaming.api.java.JavaPairDStream$$anonfun$1.apply(JavaPairDStream.scala:470)
at org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$updateStateByKey$3$$anonfun$7$$anonfun$apply$2.apply(PairDStreamFunctions.scala:435)
at org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$updateStateByKey$3$$anonfun$7$$anonfun$apply$2.apply(PairDStreamFunctions.scala:435)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:360)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:951)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:926)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:670)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
16/11/03 10:26:20 WARN TaskSetManager: Lost task 0.0 in stage 22.0 (TID 20, localhost): java.lang.ClassCastException: org.apache.spark.api.java.Optional cannot be cast to com.google.common.base.Optional
at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction2$1.apply(JavaPairRDD.scala:1025)
at org.apache.spark.streaming.api.java.JavaPairDStream$$anonfun$1.apply(JavaPairDStream.scala:473)
at org.apache.spark.streaming.api.java.JavaPairDStream$$anonfun$1.apply(JavaPairDStream.scala:470)
at org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$updateStateByKey$3$$anonfun$7$$anonfun$apply$2.apply(PairDStreamFunctions.scala:435)
at org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$updateStateByKey$3$$anonfun$7$$anonfun$apply$2.apply(PairDStreamFunctions.scala:435)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:360)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:951)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:926)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:670)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

1 ACCEPTED SOLUTION

Accepted Solutions
Highlighted

Re: Spark throwing classCastException when updateStateByKey is called

New Contributor

Solved by updating the version to latest

 

The latest version of spark uses Optional within the spark package. But older version uses Optional from google package. 

 

1 REPLY 1
Highlighted

Re: Spark throwing classCastException when updateStateByKey is called

New Contributor

Solved by updating the version to latest

 

The latest version of spark uses Optional within the spark package. But older version uses Optional from google package.