Support Questions

Find answers, ask questions, and share your expertise

SAM application: "Unknown protocol id [12] received while deserializing the payload" received from the schema registry

avatar
Expert Contributor

I created a minimal SAM application that reads Avro messages from Kafka and writes them to Druid:

19549-sam-temperature-humidity-app.png

The Avro schema for the data in the Kafka topic was previously added to the schema registry:

19550-schema-registry.png

When I run the topology, the following error is thrown:

com.hortonworks.registries.schemaregistry.serde.SerDesException: Unknown protocol id [12] received while deserializing the payload at com.hortonworks.registries.schemaregistry.serdes.avro.AvroSnapshotDeserializer.retrieveProtocolId(AvroSnapshotDeserializer.java:75) at com.hortonworks.registries.schemaregistry.serdes.avro.AvroSnapshotDeserializer.retrieveProtocolId(AvroSnapshotDeserializer.java:32) at com.hortonworks.registries.schemaregistry.serde.AbstractSnapshotDeserializer.deserialize(AbstractSnapshotDeserializer.java:145) at com.hortonworks.streamline.streams.runtime.storm.spout.AvroKafkaSpoutTranslator.apply(AvroKafkaSpoutTranslator.java:61) at org.apache.storm.kafka.spout.KafkaSpout.emitTupleIfNotEmitted(KafkaSpout.java:335) at org.apache.storm.kafka.spout.KafkaSpout.emit(KafkaSpout.java:316) at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:236) at org.apache.storm.daemon.executor$fn__5136$fn__5151$fn__5182.invoke(executor.clj:647) at org.apache.storm.util$async_loop$fn__553.invoke(util.clj:484) at clojure.lang.AFn.run(AFn.java:22) at java.lang.Thread.run(Thread.java:745)

I took a peek at the code that's throwing the SerDesException. It seems that the first byte of the Avro inputstream is supposed to contain the protocol version/id:

protected byte retrieveProtocolId(InputStream inputStream) throws SerDesException {
    // first byte is protocol version/id.
    // protocol format:
    // 1 byte  : protocol version
    byte protocolId;
    try {
        protocolId = (byte) inputStream.read();
    } catch (IOException e) {
        throw new SerDesException(e);
    }

    if (protocolId == -1) {
        throw new SerDesException("End of stream reached while trying to read protocol id");
    }
    checkProtocolHandlerExists(protocolId);
    return protocolId;
}

private void checkProtocolHandlerExists(byte protocolId) {
    if (SerDesProtocolHandlerRegistry.get().getSerDesProtocolHandler(protocolId) == null) {
        throw new SerDesException("Unknown protocol id [" + protocolId + "] received while deserializing the payload");
    }
}

The first byte of the Avro inputstream appears to be a form-feed character (ASCII code 12):

19551-first-byte-line-feed.png

Looking at the registry metastore, the only ID that exists is a 2:

mysql> SELECT id, type, schemaGroup, name FROM registry.schema_metadata_info;
+----+------+-------------+----------------------+
| id | type | schemaGroup | name                 |
+----+------+-------------+----------------------+
|  2 | avro | Kafka       | temperature_humidity |
+----+------+-------------+----------------------+
1 row in set (0.00 sec)

I don't understand how the first byte of the Avro byte array could contain the ID for the schema registry unless it were created with a schema registry aware serializer.

Can you see what I'm doing wrong?

1 ACCEPTED SOLUTION

avatar
Contributor

SAM expects messages are serialized with kafka avro serializer instead of raw avro messages in the topic. You should produce messages into kafka source's topic with KafkaAvroSerializer. You can look at KafkaAvroSerDesApp for producing and consuming messages into/from topic using schema registry serializer/deserializer.

View solution in original post

2 REPLIES 2

avatar
Contributor

SAM expects messages are serialized with kafka avro serializer instead of raw avro messages in the topic. You should produce messages into kafka source's topic with KafkaAvroSerializer. You can look at KafkaAvroSerDesApp for producing and consuming messages into/from topic using schema registry serializer/deserializer.