New Contributor
Posts: 3
Registered: ‎12-10-2018

Flume Avro custom schema - Malformed data

I need to warehouse avro schema data in Kafka and produce HDFS files that can be warehoused by putting hive tables. I'm using flume to transfer data between Kafka and HDFS file. Thigns seem to work except when I try to query the data it gives me Malformed data exception.


Sample flume config file

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.channels = c1
a1.sources.r1.batchSize = 500
a1.sources.r1.batchDurationMillis = 1000
a1.sources.r1.kafka.bootstrap.servers = localhost:9092
a1.sources.r1.kafka.topics = convTopic = flumeTest

# hdfs sink to persist data
a1.sinks.k1.type = hdfs = c1
a1.sinks.k1.hdfs.path = /tmp/flume/%{topic}/hitdate=20181129
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.fileSuffix = .avro

# Roll files in HDFS every 5 min or at 255MB; don't roll based on number of records
# We roll at 255MB because our block size is 128MB, we want 2 full blocks without going over
a1.sinks.k1.hdfs.rollInterval = 60
a1.sinks.k1.hdfs.rollSize = 267386880
a1.sinks.k1.hdfs.rollCount = 0
# Write to HDFS file in batches of 20 records
a1.sinks.k1.hdfs.batchSize = 20

a1.sinks.k1.serializer = org.apache.flume.sink.hdfs.AvroEventSerializer$Builder
a1.sinks.k1.serializer.schemaURL = file:///SchemaFileLocation.avsc

# following property gives generic avro event and I can read that fine using
# avro table but custom schema is where I've issues
#a1.sinks.k1.serializer = avro_event
a1.sinks.k1.serializer.compressionCodec = snappy

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 1000

# Bind the source and sink to the channel
a1.sources.r1.channels = c1 = c1


Did anyone faced this issue? If so can you pls point me in right direction?

The Kite SDK is a collection of docs, sample code, APIs, and tools to make Hadoop application development faster. Learn more at