Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

How to provide a different dependency for RDD in spark?

avatar

I have an application jar with the dependency okhttp 3.10.0, which has a dependency on okio 1.14.0.

Spark2 client provides Okhttp 2.14.0 and okio 1.4.0 in the /usr/hdp/current/spark2-client/jars directory. When running the spark application, I receive the following error:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 (TID 3, localhost, executor driver): java.lang.NoSuchMethodError: okio.BufferedSource.readUtf8LineStrict(J)Ljava/lang/String; at okhttp3.internal.http1.Http1Codec.readHeaderLine(Http1Codec.java:215) at okhttp3.internal.http1.Http1Codec.readResponseHeaders(Http1Codec.java:189) at okhttp3.internal.http.CallServerInterceptor.intercept(CallServerInterceptor.java:88) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147) at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:45) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121) at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121) at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147) at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:126) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121) at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:200) at okhttp3.RealCall.execute(RealCall.java:77)

....
at org.apache.spark.rdd.RDD$anonfun$foreachPartition$1$anonfun$apply$29.apply(RDD.scala:926) at org.apache.spark.rdd.RDD$anonfun$foreachPartition$1$anonfun$apply$29.apply(RDD.scala:926) at org.apache.spark.SparkContext$anonfun$runJob$5.apply(SparkContext.scala:2069) at org.apache.spark.SparkContext$anonfun$runJob$5.apply(SparkContext.scala:2069) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

And:
Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$failJobAndIndependentStages(DAGScheduler.scala:1517) at org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1505) at org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1504) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1504) at org.apache.spark.scheduler.DAGScheduler$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814) at org.apache.spark.scheduler.DAGScheduler$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1732) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1687) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1676) at org.apache.spark.util.EventLoop$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2029) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2050) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2069) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2094) at org.apache.spark.rdd.RDD$anonfun$foreachPartition$1.apply(RDD.scala:926) at org.apache.spark.rdd.RDD$anonfun$foreachPartition$1.apply(RDD.scala:924) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:924)

This occurs even when I bring in the two dependencies during spark-submit with the following flag:

--packages com.squareup.okhttp3:okhttp:3.10.0

Any suggestions? I would like to be able to resolve these dependency conflicts for not just OkHttp but other classes that I bring into the spark application.

1 ACCEPTED SOLUTION

avatar

The solution was to use Maven Shade - as seen originally - and use the relocate classes option.

<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-shade-plugin</artifactId>
    <executions>
        <execution>
            <phase>package</phase>
            <goals>
                <goal>shade</goal>
            </goals>
        </execution>
    </executions>
    <configuration>
        <relocations>
            <relocation>
                <pattern>okio</pattern>
                <shadedPattern>com.shaded.okio</shadedPattern>
            </relocation>
        </relocations>
        <filters>
            <filter>
                <artifact>*:*</artifact>
                <excludes>
                    <exclude>META-INF/*.SF</exclude>
                    <exclude>META-INF/*.DSA</exclude>
                    <exclude>META-INF/*.RSA</exclude>
                </excludes>
            </filter>
        </filters>
    </configuration>
</plugin>

Make sure to use the relocation option on the package name of the classes which conflict with the ones present in the /jars directory in Spark. This will create a 'private copy' of this dependency for your application, with no potential of interference with the underlying spark dependencies. Only watch out for making your spark application too large if you add too many dependencies like this.

View solution in original post

7 REPLIES 7

avatar

This is the maven shade plugin that is being used for the project:

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration> </plugin>

avatar
Master Guru

I recommend you build a uber jar. You can leave out spark packages from that jar as those libs will be provided during run time.

<dependencies>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_${scala.binary.version}</artifactId>
      <version>${spark.version}</version>
      <scope>provided</scope>
      <exclusions>
...

      <plugin>
        <artifactId>maven-assembly-plugin</artifactId>
        <version>2.2.1</version>
        <configuration>
            <descriptorRefs>
                <descriptorRef>jar-with-dependencies</descriptorRef>
            </descriptorRefs>
        </configuration>
        <executions>
            <execution>
                <id>make-assembly</id> 
                <phase>package</phase>
                <goals>
                    <goal>single</goal>
                </goals>
            </execution>
        </executions>
      </plugin>   

avatar

Hello Sunile Manjee (@sunile.manjee),

Thank you for reaching out!

We are already building an uber jar using the shade plugin for maven. That is what I posted in a comment above, sorry for the confusion!

We resolved the error. However, the method in which we resolved the error does not seem to be advisable. In the /usr/hdp/current/spark2-client/jars directory exists two jars, okhttp-2.4.0.jar and okio.1.4.0.jar. Our uber jar includes the dependencies okhttp3-3.10.0.jar and okio.1.14.0.jar. Looking at the stack trace above, you will notice that the error is thrown when accessing okio classes. What we realized is that okio.1.14.0.jar conlicts with the jar in /usr/hdp/current/spark2-client/jars, which is okio.1.4.0.jar (okhttp-3.10.0.jar does not because the dependency name is changed). Therefore, we concluded that the jars in /usr/hdp/current/spark2-client/jars are being preferred over the classes provided in our uber jar.

The solution that we used, which again is likely not a good one, was to move the okio-1.4.0.jar out of the /usr/hdp/current/spark2-client/jars directory. After this, spark-submit was using our dependency, okio-1.14.0.jar, and the application ran successfully as intended.

So my follow-up question is: What is the intended way to override classes found in /usr/hdp/current/spark2-client/jars? Is there a way in which we can prioritize our own dependencies for use over the ones provided in this directory?

Thank you very much for your time,

-Alex

avatar
Master Guru

Take a look at

spark.driver.userClassPathFirst

and spark.executor.userClassPathFirst

https://spark.apache.org/docs/latest/configuration.html

avatar

I attempted setting those configuration settings to true, but now I receive the following error:
Exception in thread "main" org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "hdfs" at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3266) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3286) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:123) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3337) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3305) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:476) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:467) at org.apache.spark.util.Utils$.getHadoopFileSystem(Utils.scala:1859) at org.apache.spark.scheduler.EventLoggingListener.<init>(EventLoggingListener.scala:68) at org.apache.spark.SparkContext.<init>(SparkContext.scala:529) at

...

sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$runMain(SparkSubmit.scala:782) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

What I found was suggested was here
https://stackoverflow.com/questions/17265002/hadoop-no-filesystem-for-scheme-file

But this solution did not work.

avatar

Additionally, that feature appears to be for cluster mode, but we are intending to run in YARN mode. If this feature is only on cluster mode then it seems like it will be at a loss if we decided to run using this fix, as the changes would not be used.

avatar

The solution was to use Maven Shade - as seen originally - and use the relocate classes option.

<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-shade-plugin</artifactId>
    <executions>
        <execution>
            <phase>package</phase>
            <goals>
                <goal>shade</goal>
            </goals>
        </execution>
    </executions>
    <configuration>
        <relocations>
            <relocation>
                <pattern>okio</pattern>
                <shadedPattern>com.shaded.okio</shadedPattern>
            </relocation>
        </relocations>
        <filters>
            <filter>
                <artifact>*:*</artifact>
                <excludes>
                    <exclude>META-INF/*.SF</exclude>
                    <exclude>META-INF/*.DSA</exclude>
                    <exclude>META-INF/*.RSA</exclude>
                </excludes>
            </filter>
        </filters>
    </configuration>
</plugin>

Make sure to use the relocation option on the package name of the classes which conflict with the ones present in the /jars directory in Spark. This will create a 'private copy' of this dependency for your application, with no potential of interference with the underlying spark dependencies. Only watch out for making your spark application too large if you add too many dependencies like this.