Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

java.lang.ArrayIndexOutOfBoundsException while reading avroReader

avatar
New Contributor

I am using the CDC (change data capture) tool + confluent kafka 3.2.1 + schema registry on the event producer side. Using Nifi ConsumeKafkaRecord_0_10 processor to consume the avro mesages on kafka in order to read the avro messages i created avroReader which has the avro schema registry plugged in but when it is reading the messages off the kafka i am getting below exception.

when i just use the confluent provider avro deserializer to read the messages off the topic using below command i dont see any exception as my first doubt was the messages on topic are corrupt but that is not the case.

bin/kafka-avro-console-consumer --bootstrap-server localhost:9092 --property schema.registry.url="http://localhost:8081" --topic kaf_poc.poc_kafka.sourcedb.dbas.emp_nopk --from-beginning

2017-07-11 01:00:49,939 ERROR [Timer-Driven Process Thread-3] o.a.n.p.k.pubsub.ConsumeKafkaRecord_0_10 ConsumeKafkaRecord_0_10[id=2f997e79-015d-1000-398e-65916520b3f7] Failed to parse message from Kafka using the configured Record Reader. Will route message as its own FlowFile to the 'parse.failure' relationship: java.lang.ArrayIndexOutOfBoundsException: 32 java.lang.ArrayIndexOutOfBoundsException: 32 at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:416) at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290) at org.apache.avro.io.parsing.Parser.advance(Parser.java:88) at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152) at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:240) at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:230) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:174) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144) at org.apache.nifi.avro.AvroReaderWithExplicitSchema.nextAvroRecord(AvroReaderWithExplicitSchema.java:61) at org.apache.nifi.avro.AvroRecordReader.nextRecord(AvroRecordReader.java:36) at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.writeRecordData(ConsumerLease.java:434) at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.lambda$processRecords$8(ConsumerLease.java:320) at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1540) at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580) at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.processRecords(ConsumerLease.java:307) at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.poll(ConsumerLease.java:168) at org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10.onTrigger(ConsumeKafkaRecord_0_10.java:327) at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1120) at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147) at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:132) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

1 REPLY 1

avatar
Master Guru

You currently can't use ConsumeKafkaRecord_0_10 to consume Confluent Avro. The Confluent Avro is a special Avro format that contains additional information and cannot be read by regular Avro readers.

On the master branch of Apache NiFi there is support for integration with Confluent, there will be a new option in the "Schema Access Strategy" for "Confluent Content-Encoded Schema Reference" which will allow it to read the Confluent Avro.