- Subscribe to RSS Feed
- Mark Question as New
- Mark Question as Read
- Float this Question for Current User
- Bookmark
- Subscribe
- Mute
- Printer Friendly Page
Why does Storm fail with this deserialization error only when using multiple workers?
- Labels:
-
Apache Storm
Created ‎01-26-2016 09:13 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
I have a Storm topology with bolts that pass Avro GenericRecord objects, i.e. the new AvroGenericRecordBolt (https://github.com/apache/storm/blob/master/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBolt.java) and a custom bolt which emits GenericRecords. When I run the topology in a single worker, everything is fine. When I run with multiple workers, I get the serialization errors below. I tried registering GenericData$Record with kryo, but since Record doesn't implement Serializable that doesn't work either (as expected).
- Why does this error appear only when I have multiple workers?
- Any suggestions to get around this given that Record isn't Serializable?
java.lang.RuntimeException: java.lang.RuntimeException: java.io.NotSerializableException: org.apache.avro.generic.GenericData$Record at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.9.3.2.2.4.12-1.jar:0.9.3.2.2.4.12-1] at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.9.3.2.2.4.12-1.jar:0.9.3.2.2.4.12-1] at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.9.3.2.2.4.12-1.jar:0.9.3.2.2.4.12-1] at backtype.storm.disruptor$consume_loop_STAR_$fn__1077.invoke(disruptor.clj:94) ~[storm-core-0.9.3.2.2.4.12-1.jar:0.9.3.2.2.4.12-1] at backtype.storm.util$async_loop$fn__551.invoke(util.clj:465) ~[storm-core-0.9.3.2.2.4.12-1.jar:0.9.3.2.2.4.12-1] at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] at java.lang.Thread.run(Thread.java:744) [na:1.7.0_45] Caused by: java.lang.RuntimeException: java.io.NotSerializableException: org.apache.avro.generic.GenericData$Record at backtype.storm.serialization.SerializableSerializer.write(SerializableSerializer.java:41) ~[storm-core-0.9.3.2.2.4.12-1.jar:0.9.3.2.2.4.12-1] at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) ~[kryo-2.21.jar:na] at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:75) ~[kryo-2.21.jar:na] at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18) ~[kryo-2.21.jar:na] at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:486) ~[kryo-2.21.jar:na] at backtype.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:44) ~[storm-core-0.9.3.2.2.4.12-1.jar:0.9.3.2.2.4.12-1] at backtype.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:44) ~[storm-core-0.9.3.2.2.4.12-1.jar:0.9.3.2.2.4.12-1] at backtype.storm.daemon.worker$mk_transfer_fn$transfer_fn__5386.invoke(worker.clj:139) ~[storm-core-0.9.3.2.2.4.12-1.jar:0.9.3.2.2.4.12-1] at backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__5107.invoke(executor.clj:263) ~[storm-core-0.9.3.2.2.4.12-1.jar:0.9.3.2.2.4.12-1] at backtype.storm.disruptor$clojure_handler$reify__1064.onEvent(disruptor.clj:58) ~[storm-core-0.9.3.2.2.4.12-1.jar:0.9.3.2.2.4.12-1] at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) ~[storm-core-0.9.3.2.2.4.12-1.jar:0.9.3.2.2.4.12-1] ... 6 common frames omitted Caused by: java.io.NotSerializableException: org.apache.avro.generic.GenericData$Record at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) ~[na:1.7.0_45] at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) ~[na:1.7.0_45] at backtype.storm.serialization.SerializableSerializer.write(SerializableSerializer.java:38) ~[storm-core-0.9.3.2.2.4.12-1.jar:0.9.3.2.2.4.12-1] ... 16 common frames omitted
Created ‎01-26-2016 09:25 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
When you are deploying to a single worker, there is no inter-worker communication, so there is no need to (de)serialize the data (everything is happening in the same JVM).
To work around the issue, you would have to implement your own kryo serializer for the GenericRecord.Record class.
Created ‎01-26-2016 09:25 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
When you are deploying to a single worker, there is no inter-worker communication, so there is no need to (de)serialize the data (everything is happening in the same JVM).
To work around the issue, you would have to implement your own kryo serializer for the GenericRecord.Record class.
Created ‎01-26-2016 09:31 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Thanks @tgoetz, that makes sense. I have seen other deserialization errors in single worker topologies, which made this a bit surprising.
Created ‎01-27-2016 01:43 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
FYI, same or upper version of Storm 0.10.0-beta, you can set 'topology.testing.always.try.serialize' to 'true' when testing your topology in local to force (de)serialization, and set it off when distributing works to multiple hosts.
Created ‎01-27-2016 02:13 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Great tip, @Jungtaek Lim! I might be missing something obvious, but my first attempt at registering a serializer seemed to have no effect at all.
conf.registerSerialization(GenericData.Record.class, AvroGenericSerializer.class);
I got precisely the same error message. It's possible that my serializer has a bug, but I didn't see any error related to the serializer itself.
Created ‎01-27-2016 02:38 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
I clearly have a LOT more to learn, but adding this fixes the issue:
conf.setSkipMissingKryoRegistrations(false);
Created ‎05-14-2018 03:22 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Wondering why HDP 2.6 (means HADOOP 2.7.3) comes with Avro 1.7.4 version which does not support Serialization. https://hadoop.apache.org/docs/r2.7.3/hadoop-mapreduce-client/hadoop-mapreduce-client-core/dependenc...
Created ‎05-14-2018 09:23 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
To keep us on our toes!
