Created on 07-24-2017 04:44 PM - edited 08-18-2019 12:23 AM
I created a minimal SAM application that reads Avro messages from Kafka and writes them to Druid:
The Avro schema for the data in the Kafka topic was previously added to the schema registry:
When I run the topology, the following error is thrown:
com.hortonworks.registries.schemaregistry.serde.SerDesException: Unknown protocol id [12] received while deserializing the payload at com.hortonworks.registries.schemaregistry.serdes.avro.AvroSnapshotDeserializer.retrieveProtocolId(AvroSnapshotDeserializer.java:75) at com.hortonworks.registries.schemaregistry.serdes.avro.AvroSnapshotDeserializer.retrieveProtocolId(AvroSnapshotDeserializer.java:32) at com.hortonworks.registries.schemaregistry.serde.AbstractSnapshotDeserializer.deserialize(AbstractSnapshotDeserializer.java:145) at com.hortonworks.streamline.streams.runtime.storm.spout.AvroKafkaSpoutTranslator.apply(AvroKafkaSpoutTranslator.java:61) at org.apache.storm.kafka.spout.KafkaSpout.emitTupleIfNotEmitted(KafkaSpout.java:335) at org.apache.storm.kafka.spout.KafkaSpout.emit(KafkaSpout.java:316) at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:236) at org.apache.storm.daemon.executor$fn__5136$fn__5151$fn__5182.invoke(executor.clj:647) at org.apache.storm.util$async_loop$fn__553.invoke(util.clj:484) at clojure.lang.AFn.run(AFn.java:22) at java.lang.Thread.run(Thread.java:745)
I took a peek at the code that's throwing the SerDesException. It seems that the first byte of the Avro inputstream is supposed to contain the protocol version/id:
protected byte retrieveProtocolId(InputStream inputStream) throws SerDesException { // first byte is protocol version/id. // protocol format: // 1 byte : protocol version byte protocolId; try { protocolId = (byte) inputStream.read(); } catch (IOException e) { throw new SerDesException(e); } if (protocolId == -1) { throw new SerDesException("End of stream reached while trying to read protocol id"); } checkProtocolHandlerExists(protocolId); return protocolId; } private void checkProtocolHandlerExists(byte protocolId) { if (SerDesProtocolHandlerRegistry.get().getSerDesProtocolHandler(protocolId) == null) { throw new SerDesException("Unknown protocol id [" + protocolId + "] received while deserializing the payload"); } }
The first byte of the Avro inputstream appears to be a form-feed character (ASCII code 12):
Looking at the registry metastore, the only ID that exists is a 2:
mysql> SELECT id, type, schemaGroup, name FROM registry.schema_metadata_info; +----+------+-------------+----------------------+ | id | type | schemaGroup | name | +----+------+-------------+----------------------+ | 2 | avro | Kafka | temperature_humidity | +----+------+-------------+----------------------+ 1 row in set (0.00 sec)
I don't understand how the first byte of the Avro byte array could contain the ID for the schema registry unless it were created with a schema registry aware serializer.
Can you see what I'm doing wrong?
Created 08-08-2017 01:52 PM
SAM expects messages are serialized with kafka avro serializer instead of raw avro messages in the topic. You should produce messages into kafka source's topic with KafkaAvroSerializer. You can look at KafkaAvroSerDesApp for producing and consuming messages into/from topic using schema registry serializer/deserializer.
Created 08-08-2017 01:52 PM
SAM expects messages are serialized with kafka avro serializer instead of raw avro messages in the topic. You should produce messages into kafka source's topic with KafkaAvroSerializer. You can look at KafkaAvroSerDesApp for producing and consuming messages into/from topic using schema registry serializer/deserializer.
Created 08-08-2017 02:00 PM