Reply
Explorer
Posts: 6
Registered: ‎10-18-2016

Oozie Spark Kryo Serialization not consitent

Hello,

 

Summary:

 

I am experiencing a very strange problem where Kryo serialization works only once in the same Spark application.

That is, first step, it works, second step it doesn't.   This only seems to occur when the application is invoked as part of an Oozie job.  I have run the exact same code from a unit test, and it works as expected.

 

Details:

 

Running on a Cloudera Manager QuickStart VM CDH 5.8.0  single host:
Version: Cloudera Express 5.8.0 (#42 built by jenkins on 20160714-1246 git: d08ac14ff050a108864fab00205c12e0d4043132)

Java VM Name: Java HotSpot(TM) 64-Bit Server VM

Java VM Vendor: Oracle Corporation

Java Version: 1.8.0_101

 

The following options are passed to the SparkConf:

 

 

sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
sparkConf.set("spark.kryo.registrator", "org.sample.KryoRegistration");

 

 

In my custom KyroRegistration, I print some loggin for debugging, and I can see that my serializer is being used when the first step of the application is run.  Now, when the second step runs, it looks like Spark is using NOT the Kryo serialization, but the Java serialization:

 

Caused by: java.io.NotSerializableException: org.apache.hadoop.util.bloom.CountingBloomFilter
Serialization stack:
	- object not serializable (class: org.apache.hadoop.util.bloom.CountingBloomFilter, value: 2 1 1 5 5 5 2)
	- field (class: org.camp.scms.poc.ma.data.spark.EventBasedApp$5, name: val$filter, type: class org.apache.hadoop.util.bloom.CountingBloomFilter)
	- object (class org.camp.scms.poc.ma.data.spark.EventBasedApp$5, org.sample.MyApp$5@40b31a16)
	- field (class: org.apache.spark.api.java.JavaRDD$$anonfun$filter$1, name: f$1, type: interface org.apache.spark.api.java.function.Function)
	- object (class org.apache.spark.api.java.JavaRDD$$anonfun$filter$1, <function1>)
	at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
	... 39 more

How is this possible?

 

 

TIA

Explorer
Posts: 6
Registered: ‎10-18-2016

Re: Oozie Spark Kryo Serialization not consitent

Update

 

I was able to reproduce this problem with a smaller unit test. No Oozie involved.  It works fine when the object (a Bloom filter in my case) is created and returned from a RDD map/reduce job, but it fails when this same object is later passed as input to a second Spark transformation.   I can see Kryo and my custom serializer is being used during creation/population of my Bloom filter, but then, when performing the second step, it seems to be using Java serialization instead of Kryo and my custom serializer.


 

Any idea?

 

 

Highlighted
Expert Contributor
Posts: 338
Registered: ‎01-25-2017

Re: Oozie Spark Kryo Serialization not consitent

@yeyo hey Yoyo, were you able to solve it?

Announcements