Support Questions

Find answers, ask questions, and share your expertise
Announcements
Check out our newest addition to the community, the Cloudera Data Analytics (CDA) group hub.

Why does Storm fail with this deserialization error only when using multiple workers?

Contributor

I have a Storm topology with bolts that pass Avro GenericRecord objects, i.e. the new AvroGenericRecordBolt (https://github.com/apache/storm/blob/master/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBolt.java) and a custom bolt which emits GenericRecords. When I run the topology in a single worker, everything is fine. When I run with multiple workers, I get the serialization errors below. I tried registering GenericData$Record with kryo, but since Record doesn't implement Serializable that doesn't work either (as expected).

- Why does this error appear only when I have multiple workers?

- Any suggestions to get around this given that Record isn't Serializable?

java.lang.RuntimeException: java.lang.RuntimeException: java.io.NotSerializableException: org.apache.avro.generic.GenericData$Record
        at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.9.3.2.2.4.12-1.jar:0.9.3.2.2.4.12-1]
        at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.9.3.2.2.4.12-1.jar:0.9.3.2.2.4.12-1]
        at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.9.3.2.2.4.12-1.jar:0.9.3.2.2.4.12-1]
        at backtype.storm.disruptor$consume_loop_STAR_$fn__1077.invoke(disruptor.clj:94) ~[storm-core-0.9.3.2.2.4.12-1.jar:0.9.3.2.2.4.12-1]
        at backtype.storm.util$async_loop$fn__551.invoke(util.clj:465) ~[storm-core-0.9.3.2.2.4.12-1.jar:0.9.3.2.2.4.12-1]
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
        at java.lang.Thread.run(Thread.java:744) [na:1.7.0_45]
Caused by: java.lang.RuntimeException: java.io.NotSerializableException: org.apache.avro.generic.GenericData$Record
        at backtype.storm.serialization.SerializableSerializer.write(SerializableSerializer.java:41) ~[storm-core-0.9.3.2.2.4.12-1.jar:0.9.3.2.2.4.12-1]
        at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) ~[kryo-2.21.jar:na]
        at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:75) ~[kryo-2.21.jar:na]
        at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18) ~[kryo-2.21.jar:na]
        at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:486) ~[kryo-2.21.jar:na]
        at backtype.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:44) ~[storm-core-0.9.3.2.2.4.12-1.jar:0.9.3.2.2.4.12-1]
        at backtype.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:44) ~[storm-core-0.9.3.2.2.4.12-1.jar:0.9.3.2.2.4.12-1]
        at backtype.storm.daemon.worker$mk_transfer_fn$transfer_fn__5386.invoke(worker.clj:139) ~[storm-core-0.9.3.2.2.4.12-1.jar:0.9.3.2.2.4.12-1]
        at backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__5107.invoke(executor.clj:263) ~[storm-core-0.9.3.2.2.4.12-1.jar:0.9.3.2.2.4.12-1]
        at backtype.storm.disruptor$clojure_handler$reify__1064.onEvent(disruptor.clj:58) ~[storm-core-0.9.3.2.2.4.12-1.jar:0.9.3.2.2.4.12-1]
        at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) ~[storm-core-0.9.3.2.2.4.12-1.jar:0.9.3.2.2.4.12-1]
        ... 6 common frames omitted
Caused by: java.io.NotSerializableException: org.apache.avro.generic.GenericData$Record
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) ~[na:1.7.0_45]
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) ~[na:1.7.0_45]
        at backtype.storm.serialization.SerializableSerializer.write(SerializableSerializer.java:38) ~[storm-core-0.9.3.2.2.4.12-1.jar:0.9.3.2.2.4.12-1]
        ... 16 common frames omitted
1 ACCEPTED SOLUTION

New Contributor

When you are deploying to a single worker, there is no inter-worker communication, so there is no need to (de)serialize the data (everything is happening in the same JVM).

To work around the issue, you would have to implement your own kryo serializer for the GenericRecord.Record class.

View solution in original post

7 REPLIES 7

New Contributor

When you are deploying to a single worker, there is no inter-worker communication, so there is no need to (de)serialize the data (everything is happening in the same JVM).

To work around the issue, you would have to implement your own kryo serializer for the GenericRecord.Record class.

Contributor

Thanks @tgoetz, that makes sense. I have seen other deserialization errors in single worker topologies, which made this a bit surprising.

Cloudera Employee

FYI, same or upper version of Storm 0.10.0-beta, you can set 'topology.testing.always.try.serialize' to 'true' when testing your topology in local to force (de)serialization, and set it off when distributing works to multiple hosts.

Contributor

Great tip, @Jungtaek Lim! I might be missing something obvious, but my first attempt at registering a serializer seemed to have no effect at all.

conf.registerSerialization(GenericData.Record.class, AvroGenericSerializer.class);

I got precisely the same error message. It's possible that my serializer has a bug, but I didn't see any error related to the serializer itself.

Contributor

I clearly have a LOT more to learn, but adding this fixes the issue:

conf.setSkipMissingKryoRegistrations(false);

Wondering why HDP 2.6 (means HADOOP 2.7.3) comes with Avro 1.7.4 version which does not support Serialization. https://hadoop.apache.org/docs/r2.7.3/hadoop-mapreduce-client/hadoop-mapreduce-client-core/dependenc...

Contributor

To keep us on our toes!

Take a Tour of the Community
Don't have an account?
Your experience may be limited. Sign in to explore more.