I am experiencing a very strange problem where Kryo serialization works only once in the same Spark application.
That is, first step, it works, second step it doesn't. This only seems to occur when the application is invoked as part of an Oozie job. I have run the exact same code from a unit test, and it works as expected.
Running on a Cloudera Manager QuickStart VM CDH 5.8.0 single host:
Version: Cloudera Express 5.8.0 (#42 built by jenkins on 20160714-1246 git: d08ac14ff050a108864fab00205c12e0d4043132)
Java VM Name: Java HotSpot(TM) 64-Bit Server VM
Java VM Vendor: Oracle Corporation
Java Version: 1.8.0_101
The following options are passed to the SparkConf:
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); sparkConf.set("spark.kryo.registrator", "org.sample.KryoRegistration");
In my custom KyroRegistration, I print some loggin for debugging, and I can see that my serializer is being used when the first step of the application is run. Now, when the second step runs, it looks like Spark is using NOT the Kryo serialization, but the Java serialization:
Caused by: java.io.NotSerializableException: org.apache.hadoop.util.bloom.CountingBloomFilter Serialization stack: - object not serializable (class: org.apache.hadoop.util.bloom.CountingBloomFilter, value: 2 1 1 5 5 5 2) - field (class: org.camp.scms.poc.ma.data.spark.EventBasedApp$5, name: val$filter, type: class org.apache.hadoop.util.bloom.CountingBloomFilter) - object (class org.camp.scms.poc.ma.data.spark.EventBasedApp$5, org.sample.MyApp$5@40b31a16) - field (class: org.apache.spark.api.java.JavaRDD$$anonfun$filter$1, name: f$1, type: interface org.apache.spark.api.java.function.Function) - object (class org.apache.spark.api.java.JavaRDD$$anonfun$filter$1, <function1>) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301) ... 39 more
How is this possible?
I was able to reproduce this problem with a smaller unit test. No Oozie involved. It works fine when the object (a Bloom filter in my case) is created and returned from a RDD map/reduce job, but it fails when this same object is later passed as input to a second Spark transformation. I can see Kryo and my custom serializer is being used during creation/population of my Bloom filter, but then, when performing the second step, it seems to be using Java serialization instead of Kryo and my custom serializer.