Created 01-26-2016 09:13 PM
I have a Storm topology with bolts that pass Avro GenericRecord objects, i.e. the new AvroGenericRecordBolt (https://github.com/apache/storm/blob/master/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBolt.java) and a custom bolt which emits GenericRecords. When I run the topology in a single worker, everything is fine. When I run with multiple workers, I get the serialization errors below. I tried registering GenericData$Record with kryo, but since Record doesn't implement Serializable that doesn't work either (as expected).
- Why does this error appear only when I have multiple workers?
- Any suggestions to get around this given that Record isn't Serializable?
java.lang.RuntimeException: java.lang.RuntimeException: java.io.NotSerializableException: org.apache.avro.generic.GenericData$Record at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.9.3.2.2.4.12-1.jar:0.9.3.2.2.4.12-1] at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.9.3.2.2.4.12-1.jar:0.9.3.2.2.4.12-1] at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.9.3.2.2.4.12-1.jar:0.9.3.2.2.4.12-1] at backtype.storm.disruptor$consume_loop_STAR_$fn__1077.invoke(disruptor.clj:94) ~[storm-core-0.9.3.2.2.4.12-1.jar:0.9.3.2.2.4.12-1] at backtype.storm.util$async_loop$fn__551.invoke(util.clj:465) ~[storm-core-0.9.3.2.2.4.12-1.jar:0.9.3.2.2.4.12-1] at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] at java.lang.Thread.run(Thread.java:744) [na:1.7.0_45] Caused by: java.lang.RuntimeException: java.io.NotSerializableException: org.apache.avro.generic.GenericData$Record at backtype.storm.serialization.SerializableSerializer.write(SerializableSerializer.java:41) ~[storm-core-0.9.3.2.2.4.12-1.jar:0.9.3.2.2.4.12-1] at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) ~[kryo-2.21.jar:na] at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:75) ~[kryo-2.21.jar:na] at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18) ~[kryo-2.21.jar:na] at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:486) ~[kryo-2.21.jar:na] at backtype.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:44) ~[storm-core-0.9.3.2.2.4.12-1.jar:0.9.3.2.2.4.12-1] at backtype.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:44) ~[storm-core-0.9.3.2.2.4.12-1.jar:0.9.3.2.2.4.12-1] at backtype.storm.daemon.worker$mk_transfer_fn$transfer_fn__5386.invoke(worker.clj:139) ~[storm-core-0.9.3.2.2.4.12-1.jar:0.9.3.2.2.4.12-1] at backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__5107.invoke(executor.clj:263) ~[storm-core-0.9.3.2.2.4.12-1.jar:0.9.3.2.2.4.12-1] at backtype.storm.disruptor$clojure_handler$reify__1064.onEvent(disruptor.clj:58) ~[storm-core-0.9.3.2.2.4.12-1.jar:0.9.3.2.2.4.12-1] at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) ~[storm-core-0.9.3.2.2.4.12-1.jar:0.9.3.2.2.4.12-1] ... 6 common frames omitted Caused by: java.io.NotSerializableException: org.apache.avro.generic.GenericData$Record at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) ~[na:1.7.0_45] at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) ~[na:1.7.0_45] at backtype.storm.serialization.SerializableSerializer.write(SerializableSerializer.java:38) ~[storm-core-0.9.3.2.2.4.12-1.jar:0.9.3.2.2.4.12-1] ... 16 common frames omitted
Created 01-26-2016 09:25 PM
When you are deploying to a single worker, there is no inter-worker communication, so there is no need to (de)serialize the data (everything is happening in the same JVM).
To work around the issue, you would have to implement your own kryo serializer for the GenericRecord.Record class.
Created 01-26-2016 09:25 PM
When you are deploying to a single worker, there is no inter-worker communication, so there is no need to (de)serialize the data (everything is happening in the same JVM).
To work around the issue, you would have to implement your own kryo serializer for the GenericRecord.Record class.
Created 01-26-2016 09:31 PM
Thanks @tgoetz, that makes sense. I have seen other deserialization errors in single worker topologies, which made this a bit surprising.
Created 01-27-2016 01:43 AM
FYI, same or upper version of Storm 0.10.0-beta, you can set 'topology.testing.always.try.serialize' to 'true' when testing your topology in local to force (de)serialization, and set it off when distributing works to multiple hosts.
Created 01-27-2016 02:13 AM
Great tip, @Jungtaek Lim! I might be missing something obvious, but my first attempt at registering a serializer seemed to have no effect at all.
conf.registerSerialization(GenericData.Record.class, AvroGenericSerializer.class);
I got precisely the same error message. It's possible that my serializer has a bug, but I didn't see any error related to the serializer itself.
Created 01-27-2016 02:38 AM
I clearly have a LOT more to learn, but adding this fixes the issue:
conf.setSkipMissingKryoRegistrations(false);
Created 05-14-2018 03:22 PM
Wondering why HDP 2.6 (means HADOOP 2.7.3) comes with Avro 1.7.4 version which does not support Serialization. https://hadoop.apache.org/docs/r2.7.3/hadoop-mapreduce-client/hadoop-mapreduce-client-core/dependenc...
Created 05-14-2018 09:23 PM
To keep us on our toes!