Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Override libraries for spark

avatar
Explorer

 Hello,

I would like to use newer version of some of the libraries listed in /etc/spark/conf/classpath.txt.

What is the recommended way to do that? I add other libraries using spark-submit's --jars (I have the jars on HDFS), but

this does not work with newer versions of libraries that are already in classpath.txt.

Alternatively, is there a way to disable construction of classpath.txt and rely solely on libraries provided to the spark-submit (except spark and hadoop possibly)?

I'm running spark on yarn (cluster mode). 

 

Thank you!

1 ACCEPTED SOLUTION

avatar
Master Collaborator
I remember some problems with snappy and HBase like this, like somehow
an older version used by HBase ended up taking precedence in the app
classloader and then it could not quite load properly, as it couldn't
see the shared library in the parent classloader. This may be a
manifestation of that one. I know there are certainly cases where
there is no resolution to the conflict, since an app and Spark may use
mutually incompatible versions of a dependency, and one will mess with
the other if the Spark and app classloader are connected, no matter
what their ordering.

For this toy example, you'd just not set the classpath setting since
it isn't needed. For your app, if neither combination works, then your
options are probably to harmonize library versions with Spark, or
shade your copy of the library.


View solution in original post

10 REPLIES 10

avatar
Master Collaborator
I wouldn't modify that file. Instead, include your libraries with your
app or using --jars, but also try setting
spark.{driver,executor}.userClassPathFirst to true. Resolving these
conflicts is tricky in Spark, where you use a library that Spark does
too and does not shade, but this is the answer in most cases.

avatar
Explorer

We have already tried setting the 2 "userClassPathFirst" switches, but unfortunately, we ended up with same strange exception:

 

ERROR yarn.ApplicationMaster: User class threw exception: org.xerial.snappy.SnappyNative.maxCompressedLength(I)I
java.lang.UnsatisfiedLinkError: org.xerial.snappy.SnappyNative.maxCompressedLength(I)I
at org.xerial.snappy.SnappyNative.maxCompressedLength(Native Method)
at org.xerial.snappy.Snappy.maxCompressedLength(Snappy.java:316)
at org.xerial.snappy.SnappyOutputStream.<init>(SnappyOutputStream.java:79)
at org.apache.spark.io.SnappyCompressionCodec.compressedOutputStream(CompressionCodec.scala:157)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$4.apply(TorrentBroadcast.scala:199)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$4.apply(TorrentBroadcast.scala:199)
at scala.Option.map(Option.scala:145)
at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:199)
at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:101)
at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:84)
at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29)
at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1051)
at org.apache.spark.rdd.NewHadoopRDD.<init>(NewHadoopRDD.scala:77)
at org.apache.spark.SparkContext.newAPIHadoopRDD(SparkContext.scala:878)
at org.apache.spark.api.java.JavaSparkContext.newAPIHadoopRDD(JavaSparkContext.scala:516)

...

 

What I don't understand is why there is SPARK_DIST_CLASSPATH being set at all. Vanilla spark installation has everything in the single jar

and any additional dependency must be specified explicitely, correct?

Would it be possible to completely replace the classpath.txt with user provided dependencies?

Thanks!

avatar
Master Collaborator
That's a different type of conflict. You have somehow a different
version of snappy in your app classpath, maybe? You aren't including
Spark/Hadoop in your app jar right?

The Spark assembly only contains Hadoop jars if built that way, but in
a CDH cluster, that's not a good idea, as the cluster already has its
copy of Hadoop stuff. It's built as 'hadoop-provided' and the
classpath then contains Hadoop jars and dependencies, plus Spark's.

Modifying this means modifying the distribution for all applications.
It may or may not work with the rest of CDH and may or may not work
with other apps. These modifications aren't supported, though you can
try whatever you want if you are OK with 'voiding the warranty' so to
speak.

Spark classpath issues are tricky in general, not just in CDH, since
Spark uses a load of libraries and doesn't shade most of them. Yes,
you can try shading your own copies as a fall-back if the
classpath-first args don't work. But you might need to double-check
what you are trying to bring in.

avatar
Explorer

I've just tried a simple application with nothing but a Main class in the jar file. This is the code:

import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.spark._

import org.apache.spark.rdd.NewHadoopRDD

object Main {

  def main(args: Array[String]) {
    val sc = new SparkContext(new SparkConf())

    val conf = HBaseConfiguration.create()

    conf.set(HConstants.ZOOKEEPER_QUORUM, "my.server")
    //config.set("hbase.zookeeper.property.clientPort","2181");
    val explicitNamespace: String = "BankData"
    val qualifiedTableName: String = explicitNamespace + ':' + "bank"

    conf.set(TableInputFormat.INPUT_TABLE, qualifiedTableName)
    conf.set(TableOutputFormat.OUTPUT_TABLE, qualifiedTableName)
    conf.set(TableInputFormat.INPUT_TABLE, "tmp")

    var hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result])

  }

}

I run this with:

spark-submit --conf "spark.driver.userClassPathFirst=true" --conf "spark.executor.userClassPathFirst=true" --class Main --master yarn-cluster test2.jar 

I got the following exception:

15/09/22 14:34:28 ERROR yarn.ApplicationMaster: User class threw exception: org.xerial.snappy.SnappyNative.maxCompressedLength(I)I
java.lang.UnsatisfiedLinkError: org.xerial.snappy.SnappyNative.maxCompressedLength(I)I
	at org.xerial.snappy.SnappyNative.maxCompressedLength(Native Method)
	at org.xerial.snappy.Snappy.maxCompressedLength(Snappy.java:316)
	at org.xerial.snappy.SnappyOutputStream.<init>(SnappyOutputStream.java:79)
	at org.apache.spark.io.SnappyCompressionCodec.compressedOutputStream(CompressionCodec.scala:157)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$4.apply(TorrentBroadcast.scala:199)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$4.apply(TorrentBroadcast.scala:199)
	at scala.Option.map(Option.scala:145)
	at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:199)
	at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:101)
	at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:84)
	at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
	at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29)
	at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
	at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1051)
	at org.apache.spark.rdd.NewHadoopRDD.<init>(NewHadoopRDD.scala:77)
	at org.apache.spark.SparkContext.newAPIHadoopRDD(SparkContext.scala:878)
	at Main$.main(Main.scala:29)
	at Main.main(Main.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:480)

I'm running CDH 5.4.3

avatar
Master Collaborator
Hm, but have you modified classpath.txt? IIRC the last time I saw this
it was some strange problem with the snappy from HBase and one used by
other things like Spark. Does it work without the userClassPathFirst
arg? Just trying to narrow it down. This is always a problem
territory, turning on this flag, but that's a simple example with no
obvious reason it shouldn't work.

avatar
Explorer

I did not modify classpath.txt since I don't want to touch files generated by cloudera.

Without the userClassPathFirst switch, it works correctly. Unfortunately, I need the switch to replace a few other libraries that are already in the classpath.txt.

avatar
Master Collaborator
I remember some problems with snappy and HBase like this, like somehow
an older version used by HBase ended up taking precedence in the app
classloader and then it could not quite load properly, as it couldn't
see the shared library in the parent classloader. This may be a
manifestation of that one. I know there are certainly cases where
there is no resolution to the conflict, since an app and Spark may use
mutually incompatible versions of a dependency, and one will mess with
the other if the Spark and app classloader are connected, no matter
what their ordering.

For this toy example, you'd just not set the classpath setting since
it isn't needed. For your app, if neither combination works, then your
options are probably to harmonize library versions with Spark, or
shade your copy of the library.


avatar
New Contributor

I ran into the same problem. Without the "spark.executor.userClassPathFirst" set to true, I have a problem with joda-time library. Spark uses an older version of joda-time (classpath.txt shows version as 2.1) whereas application needs 2.8.1 or higher... 

 

With the "spark.executor.userClassPathFirst" set to true (and joda-time-2.8.1 provided with --jars option), I run into snappy-java version problems leading to 

java.lang.UnsatisfiedLinkError: org.xerial.snappy.SnappyNative.uncompressedLength(Ljava/lang/Object;II)I
	at org.xerial.snappy.SnappyNative.uncompressedLength(Native Method)
	at org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:541)
	at org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:350)
	at org.xerial.snappy.SnappyInputStream.rawRead(SnappyInputStream.java:158)
	at org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:142)
	at com.esotericsoftware.kryo.io.Input.fill(Input.java:140)
	at com.esotericsoftware.kryo.io.Input.require(Input.java:155)
	at com.esotericsoftware.kryo.io.Input.readInt(Input.java:337)
	at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:109)
	at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610)
	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:721)
	at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:192)
	at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:217)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:178)
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1174)
	at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165)
	at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
	at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
	at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88)
	at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
	at org.apache.spark.scheduler.Task.run(Task.scala:88)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

 

Is there a way to know which version of snappy-java is being picked up and from which location? 

avatar
New Contributor

I did try with a fat-jar (shade plugin) and include joda-time-2.9.jar, but the classes are still picked up from spark-assembly..! 

 

I had the following line of code - 

 

logger.info("DateTime classes version = " + new DateTime().getClass().getProtectionDomain().getCodeSource());

which still logs it as 

 

DateTime classes version = (file:/opt/cloudera/parcels/CDH-5.5.2-1.cdh5.5.2.p1310.1096/jars/spark-assembly-1.5.0-cdh5.5.2-hadoop2.6.0-cdh5.5.2.jar

Maven shade plugin - 

 

<plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>2.4.3</version>
        <configuration/>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
          </execution>
        </executions>
      </plugin>

 

So, what option do I try?