Support Questions

Find answers, ask questions, and share your expertise

Using ConsumeKafkaRecord_2_6 to deserialize key, value message with schemas in confluent schema registry

avatar
New Contributor

I am using Nifi 1.14.1, Kafka 2.13-2.7.1 and the processor ConsumeKafkaRecord_2_6 to process messages from a topic where the key and the value where both serialized using avro - schemas for the key and value are stored in the confluent schema registry. But the processor fails to parse the message because there is not a way - that I can see - to specify that both key and value are avro serialized with schemas stored in the confluent schema registry. The convention for naming the schema is usually [topic name]-value and [topic name]-key. I can read the messages just fine using kcat, formerly kafkacat using:

 

kcat -b broker1:9092,broker2:9092,broker3:9092 -t mytopic -s avro -r http://schema-registry_url.com -p 0

Is there a way to read such messages or am I supposed to add my own processor to nifi? Here's an error trace:

[Timer-Driven Process Thread-9] o.a.n.p.k.pubsub.ConsumeKafkaRecord_2_6 ConsumeKafkaRecord_2_6[id=f9933326-017e-1000-60be-e5077975f031] Failed to parse message fconfigured Record Reader. Will route message as its own FlowFile to the 'parse.failure' relationship: org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -62
 ↳ causes: org.apache.nifi.serialization.MalformedRecordException: Error while getting next record. Root cause: org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -62
 org.apache.nifi.serialization.MalformedRecordException: Error while getting next record. Root cause: org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -62
 	at org.apache.nifi.avro.AvroRecordReader.nextRecord(AvroRecordReader.java:52)
 	at org.apache.nifi.serialization.RecordReader.nextRecord(RecordReader.java:50)
 	at sun.reflect.GeneratedMethodAccessor559.invoke(Unknown Source)
 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 	at java.lang.reflect.Method.invoke(Method.java:498)
 	at org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:254)
 	at org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler.access$100(StandardControllerServiceInvocationHandler.java:38)
 	at org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler$ProxiedReturnObjectInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:240)
 	at com.sun.proxy.$Proxy192.nextRecord(Unknown Source)
 	at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.writeRecordData(ConsumerLease.java:549)
 	at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.lambda$processRecords$3(ConsumerLease.java:342)
 	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
 	at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:647)
 	at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.processRecords(ConsumerLease.java:329)
 	at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.poll(ConsumerLease.java:188)
 	at org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6.onTrigger(ConsumeKafkaRecord_2_6.java:472)
 	at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
 	at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1202)
 	at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:214)
 	at org.apache.nifi.controller.scheduling.QuartzSchedulingAgent$2.run(QuartzSchedulingAgent.java:137)
 	at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 	at java.lang.Thread.run(Thread.java:748)
 Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -62
 	at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:336)
 	at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)
 	at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201)
 	at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:430)
 	at org.apache.nifi.avro.NonCachingDatumReader.readString(NonCachingDatumReader.java:51)
 	at org.apache.avro.generic.GenericDatumReader.readMapKey(GenericDatumReader.java:335)
 	at org.apache.avro.generic.GenericDatumReader.readMap(GenericDatumReader.java:321)
 	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:177)
 	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
 	at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:240)
 	at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:230)
 	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:174)
 	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
 	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
 	at org.apache.nifi.avro.AvroReaderWithExplicitSchema.nextAvroRecord(AvroReaderWithExplicitSchema.java:92)
 	at org.apache.nifi.avro.AvroRecordReader.nextRecord(AvroRecordReader.java:39)
 	... 27 common frames omitted

ConsumeRecord_1_2.pngConsumeRecord_2_2.pngAvroReader.pngSchemaRegistry.png

@bbende @mburgess, @MattWho 

1 ACCEPTED SOLUTION

avatar
New Contributor

It turns out that I only need to specify that I am using "Confluent Content-Encoded Schema Reference" in the Avro Reader "Schema Access Strategy" field, in this case since I am following the schema registry conventions.

View solution in original post

2 REPLIES 2

avatar
New Contributor

It turns out that I only need to specify that I am using "Confluent Content-Encoded Schema Reference" in the Avro Reader "Schema Access Strategy" field, in this case since I am following the schema registry conventions.

avatar
Master Collaborator

Hello @Mais - Were you able to deserialise and consume both key & value ?
In my case I am able to get deserialised value but dont see key anywehere!