Reply
Highlighted
New Contributor
Posts: 5
Registered: ‎04-23-2017

Flume fails to retrieve Avro Schema

I am trying to use the Kafka Schema Registry with the HDFS sink. I am able to retrieve the schema using a curl command and to write data into my Kafka Topic using the Kafka-Avro-Console-Producer. Flume sees the event but when throws the error. I have tried different configurations and nothing is working. Please help.

 

process failed
org.apache.flume.FlumeException: Could not find schema for event [Event headers = {timestamp=1510603952954, topic=customerdata2, partition=1}, body.length = 11 ]

 

Below is my flume configuration where I list my Kafka Registry.

 

# Sources, channels, and sinks are defined per
# agent name, in this case tier1.
tier1.sources = source1
tier1.channels = channel1
tier1.sinks = sink1

tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.deserializer.schemaType = LITERAL
tier1.sources.source1.zookeeperConnect = zk-nonprod:2181
tier1.sources.source1.topic =customerdata2
tier1.sources.source1.groupId = flume
tier1.sources.source1.channels = channel1


tier1.sources.source1.kafka.consumer.timeout.ms = 100

tier1.channels.channel1.type = memory
tier1.channels.channel1.capacity = 10000
tier1.channels.channel1.transactionCapacity = 1000
tier1.channels.channel1.parseAsFlumeEvent = false
tier1.channels.channel1.kafka.consumer.auto.offset.reset = earliest

tier1.sinks.sink1.type = hdfs
tier1.sinks.sink1.serializer = org.apache.flume.sink.hdfs.AvroEventSerializer$Builder
tier1.sinks.sink1.hdfs.kerberosPrincipal=$KERBEROS_PRINCIPAL
tier1.sinks.sink1.hdfs.kerberosKeytab=$KERBEROS_KEYTAB
tier1.sinks.sink1.hdfs.path = hdfs://cdh-namenode:8020/tmp/kafka/%{topic}/%y-%m-%d
tier1.sinks.sink1.hdfs.rollInterval = 5
tier1.sinks.sink1.hdfs.rollSize = 0
tier1.sinks.sink1.hdfs.rollCount = 0
tier1.sinks.sink1.hdfs.fileType = DataStream
tier1.sinks.sink1.hdfs.serializer = avro_event
tier1.sinks.sink1.hdfs.fileSuffix = .avro
tier1.sinks.sink1.hdfs.schema.registry.url = http://kafka-util:8081
tier1.sinks.sink1.channel = channel1

Announcements