Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

New Compression codec

New Compression codec

Contributor

Hello,

 

I'm experimenting with a new compression codec for Spark (apart from the 3 standard ones of lz4, lzf and snappy).

I've written the codec in Java, and created a jar file.

 

In Spark standalone mode, I was successfully able to use the new codec, using the spark.io.compression.codec parameter. I'm using spark-submit to run my jobs. 

 

However, when using Spark on YARN, things are messed up. I see an exception saying that the constructor for the codec could not be found. Perhaps Spark isn't finding my new jar file??

 

This sounds like a simple issue, but I'm new. I would appreciate some leads.

 

Thanks.

7 REPLIES 7

Re: New Compression codec

Master Guru
Can you share your full failing stack trace, and a note on if you have an empty default constructor in your code?

Spark instantiates your codec via reflection and needs an empty default constructor for that to work: https://github.com/cloudera/spark/blob/cdh5.8.0-release/core/src/main/scala/org/apache/spark/io/Comp...

Re: New Compression codec

Contributor

Thanks for your reply. I had posted a response a couple of days ago, but it seems to have vanished. I'm responding again now - 

 

To your questions - 

a) I didn't have a default constructor (no argument) for the codec. I created one, but the problem is still seen. Besides, the codec works fine without the default constructor in standalone mode. 

 

b) Below is the stack trace seen during the failure when YARN is used, on a cluster. Further suggestions would be appreciated. Thanks.

 

User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 7, nje-amazon4): java.io.IOException: java.lang.NoSuchMethodException: com.test.new_codec.<init>(org.apache.spark.SparkConf)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1178)
at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165)
at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:65)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NoSuchMethodException: com.test.new_codec.<init>(org.apache.spark.SparkConf)
at java.lang.Class.getConstructor0(Class.java:2849)
at java.lang.Class.getConstructor(Class.java:1718)
at org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:66)
at org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:60)
at org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$setConf(TorrentBroadcast.scala:73)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:167)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1175)
... 12 more

Re: New Compression codec

Master Guru
Looks like I was incorrect about the empty constructor. The Scala syntax pointed out earlier in that snippet seems to always require the constructor accept a SparkConf argument, vs. being empty.

Does your standalone mode run exercise the codec routines in the same manner as YARN does, i.e. do you see your load/destroy messages? The part where its failing is within the Spark core packages vs. being specific to YARN library usage.

Per the error you see, is your "new_codec" a Scala class defined in the manner of "class new_codec(conf: SparkConf) extends CompressionCodec"?

Re: New Compression codec

Contributor

Thanks for the prompt response.

 

Standalone mode vs YARN mode: The very same application is executed in both scenarios. The only change is that the spark-submit uses the --master yarn --deploy-mode cluster

 

new_codec is a Java class which has a constructor accepting a SparkConf argument. Yes, it extends CompressionCodec.

 

- Given that the standalone mode works fine, I would infer that the test application and the new_codec Java code is fine.

- Given that the standard lzf codec works fine with both modes (YARN and standalone), I would infer that the Spark and YARN installation is fine.

 

Your tips/suggestions welcome. Thanks.

 

 

Highlighted

Re: New Compression codec

Contributor

Another interesting tidbit: With some debug messages, I see that the same class is successfully found and instantiated a couple of times. It fails only on a subsequent attempt to instantiate. 

Re: New Compression codec

Contributor

Any further thoughts or suggestions please?

 

It's very strange that the class is found and instantiated the first couple of times, but an exception is thrown on the 3rd occasion.

 

How do we explain this behavior?

- Maybe a visibility issue? But the class is public.

 

Thanks.

 

 

Re: New Compression codec

Contributor

I was able to fix this, by changing the name of the Java class to something else. i.e. once I renamed the Java class, and recreated the Jar file, things worked smoothly.

 

If I recreate the Jar file with the original named Java class, the failure is again seen.

 

I'm not yet sure how to explain this. Perhaps there is an older version of my Jar file that is conflicting with this name. I couldn't find one in any of the folders that I looked in. 

 

Although I have a working solution, I'm still curious about the reason for the failure. Your tips welcome.

 

Thanks.