Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: The Cloudera Community will undergo maintenance on Saturday, August 17 at 12:00am PDT. See more info here.

Spark throwing classCastException when updateStateByKey is called

SOLVED Go to solution

Spark throwing classCastException when updateStateByKey is called

New Contributor

I am getting following errors when i execute jar via spark-submit --class ... location/to/jar ... 

As soon as updateStateByKey(..function..) is hit in the following code, it throws exception below:

JavaPairDStream<Integer, Long> responseCodeCountDStream = logObject
.transformToPair(MainApplication::responseCodeCount);
JavaPairDStream<Integer, Long> cumulativeResponseCodeCountDStream =
responseCodeCountDStream.updateStateByKey(COMPUTE_RUNNING_SUM);
cumulativeResponseCodeCountDStream.foreachRDD(rdd -> {
LOG.warn("Response code counts: " + rdd.take(100));
});

 

my pom looks like this

<dependencies>
<dependency> <!-- Spark -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
<version>1.6.2</version>
</dependency>
<dependency> <!-- Spark SQL -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>1.6.2</version>
</dependency>
<dependency> <!-- Spark Streaming -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.11</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.19</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.5</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.5</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>19.0</version>
</dependency>
</dependencies>

<build>


<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<shadedArtifactAttached>true</shadedArtifactAttached>
<shadedClassifierName>allinone</shadedClassifierName>

<artifactSet>
<includes>
<include>*:*</include>
</includes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>app.MainApplication</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>

</plugins>
</build>

 

 

16/11/03 10:26:20 WARN BlockManager: Putting block rdd_30_0 failed due to an exception
16/11/03 10:26:20 WARN BlockManager: Block rdd_30_0 could not be removed as it was not found on disk or in memory
16/11/03 10:26:20 ERROR Executor: Exception in task 0.0 in stage 22.0 (TID 20)
java.lang.ClassCastException: org.apache.spark.api.java.Optional cannot be cast to com.google.common.base.Optional
at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction2$1.apply(JavaPairRDD.scala:1025)
at org.apache.spark.streaming.api.java.JavaPairDStream$$anonfun$1.apply(JavaPairDStream.scala:473)
at org.apache.spark.streaming.api.java.JavaPairDStream$$anonfun$1.apply(JavaPairDStream.scala:470)
at org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$updateStateByKey$3$$anonfun$7$$anonfun$apply$2.apply(PairDStreamFunctions.scala:435)
at org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$updateStateByKey$3$$anonfun$7$$anonfun$apply$2.apply(PairDStreamFunctions.scala:435)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:360)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:951)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:926)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:670)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
16/11/03 10:26:20 WARN TaskSetManager: Lost task 0.0 in stage 22.0 (TID 20, localhost): java.lang.ClassCastException: org.apache.spark.api.java.Optional cannot be cast to com.google.common.base.Optional
at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction2$1.apply(JavaPairRDD.scala:1025)
at org.apache.spark.streaming.api.java.JavaPairDStream$$anonfun$1.apply(JavaPairDStream.scala:473)
at org.apache.spark.streaming.api.java.JavaPairDStream$$anonfun$1.apply(JavaPairDStream.scala:470)
at org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$updateStateByKey$3$$anonfun$7$$anonfun$apply$2.apply(PairDStreamFunctions.scala:435)
at org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$updateStateByKey$3$$anonfun$7$$anonfun$apply$2.apply(PairDStreamFunctions.scala:435)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:360)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:951)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:926)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:670)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

1 ACCEPTED SOLUTION

Accepted Solutions

Re: Spark throwing classCastException when updateStateByKey is called

New Contributor

Solved by updating the version to latest

 

The latest version of spark uses Optional within the spark package. But older version uses Optional from google package. 

 

1 REPLY 1

Re: Spark throwing classCastException when updateStateByKey is called

New Contributor

Solved by updating the version to latest

 

The latest version of spark uses Optional within the spark package. But older version uses Optional from google package.