Support Questions

Find answers, ask questions, and share your expertise

Why does Storm fail with this deserialization error only when using multiple workers?

avatar
Rising Star

I have a Storm topology with bolts that pass Avro GenericRecord objects, i.e. the new AvroGenericRecordBolt (https://github.com/apache/storm/blob/master/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBolt.java) and a custom bolt which emits GenericRecords. When I run the topology in a single worker, everything is fine. When I run with multiple workers, I get the serialization errors below. I tried registering GenericData$Record with kryo, but since Record doesn't implement Serializable that doesn't work either (as expected).

- Why does this error appear only when I have multiple workers?

- Any suggestions to get around this given that Record isn't Serializable?

java.lang.RuntimeException: java.lang.RuntimeException: java.io.NotSerializableException: org.apache.avro.generic.GenericData$Record
        at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.9.3.2.2.4.12-1.jar:0.9.3.2.2.4.12-1]
        at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.9.3.2.2.4.12-1.jar:0.9.3.2.2.4.12-1]
        at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.9.3.2.2.4.12-1.jar:0.9.3.2.2.4.12-1]
        at backtype.storm.disruptor$consume_loop_STAR_$fn__1077.invoke(disruptor.clj:94) ~[storm-core-0.9.3.2.2.4.12-1.jar:0.9.3.2.2.4.12-1]
        at backtype.storm.util$async_loop$fn__551.invoke(util.clj:465) ~[storm-core-0.9.3.2.2.4.12-1.jar:0.9.3.2.2.4.12-1]
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
        at java.lang.Thread.run(Thread.java:744) [na:1.7.0_45]
Caused by: java.lang.RuntimeException: java.io.NotSerializableException: org.apache.avro.generic.GenericData$Record
        at backtype.storm.serialization.SerializableSerializer.write(SerializableSerializer.java:41) ~[storm-core-0.9.3.2.2.4.12-1.jar:0.9.3.2.2.4.12-1]
        at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) ~[kryo-2.21.jar:na]
        at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:75) ~[kryo-2.21.jar:na]
        at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18) ~[kryo-2.21.jar:na]
        at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:486) ~[kryo-2.21.jar:na]
        at backtype.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:44) ~[storm-core-0.9.3.2.2.4.12-1.jar:0.9.3.2.2.4.12-1]
        at backtype.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:44) ~[storm-core-0.9.3.2.2.4.12-1.jar:0.9.3.2.2.4.12-1]
        at backtype.storm.daemon.worker$mk_transfer_fn$transfer_fn__5386.invoke(worker.clj:139) ~[storm-core-0.9.3.2.2.4.12-1.jar:0.9.3.2.2.4.12-1]
        at backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__5107.invoke(executor.clj:263) ~[storm-core-0.9.3.2.2.4.12-1.jar:0.9.3.2.2.4.12-1]
        at backtype.storm.disruptor$clojure_handler$reify__1064.onEvent(disruptor.clj:58) ~[storm-core-0.9.3.2.2.4.12-1.jar:0.9.3.2.2.4.12-1]
        at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) ~[storm-core-0.9.3.2.2.4.12-1.jar:0.9.3.2.2.4.12-1]
        ... 6 common frames omitted
Caused by: java.io.NotSerializableException: org.apache.avro.generic.GenericData$Record
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) ~[na:1.7.0_45]
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) ~[na:1.7.0_45]
        at backtype.storm.serialization.SerializableSerializer.write(SerializableSerializer.java:38) ~[storm-core-0.9.3.2.2.4.12-1.jar:0.9.3.2.2.4.12-1]
        ... 16 common frames omitted
1 ACCEPTED SOLUTION

avatar
Explorer

When you are deploying to a single worker, there is no inter-worker communication, so there is no need to (de)serialize the data (everything is happening in the same JVM).

To work around the issue, you would have to implement your own kryo serializer for the GenericRecord.Record class.

View solution in original post

7 REPLIES 7

avatar
Explorer

When you are deploying to a single worker, there is no inter-worker communication, so there is no need to (de)serialize the data (everything is happening in the same JVM).

To work around the issue, you would have to implement your own kryo serializer for the GenericRecord.Record class.

avatar
Rising Star

Thanks @tgoetz, that makes sense. I have seen other deserialization errors in single worker topologies, which made this a bit surprising.

avatar
Contributor

FYI, same or upper version of Storm 0.10.0-beta, you can set 'topology.testing.always.try.serialize' to 'true' when testing your topology in local to force (de)serialization, and set it off when distributing works to multiple hosts.

avatar
Rising Star

Great tip, @Jungtaek Lim! I might be missing something obvious, but my first attempt at registering a serializer seemed to have no effect at all.

conf.registerSerialization(GenericData.Record.class, AvroGenericSerializer.class);

I got precisely the same error message. It's possible that my serializer has a bug, but I didn't see any error related to the serializer itself.

avatar
Rising Star

I clearly have a LOT more to learn, but adding this fixes the issue:

conf.setSkipMissingKryoRegistrations(false);

avatar
Contributor

Wondering why HDP 2.6 (means HADOOP 2.7.3) comes with Avro 1.7.4 version which does not support Serialization. https://hadoop.apache.org/docs/r2.7.3/hadoop-mapreduce-client/hadoop-mapreduce-client-core/dependenc...

avatar
Rising Star

To keep us on our toes!