Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Flume Avro custom schema - Malformed data

Highlighted

Flume Avro custom schema - Malformed data

New Contributor

I need to warehouse avro schema data in Kafka and produce HDFS files that can be warehoused by putting hive tables. I'm using flume to transfer data between Kafka and HDFS file. Thigns seem to work except when I try to query the data it gives me Malformed data exception.

 

Sample flume config file

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.channels = c1
a1.sources.r1.batchSize = 500
a1.sources.r1.batchDurationMillis = 1000
a1.sources.r1.kafka.bootstrap.servers = localhost:9092
a1.sources.r1.kafka.topics = convTopic
a1.sources.r1.kafka.consumer.group.id = flumeTest

# hdfs sink to persist data
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /tmp/flume/%{topic}/hitdate=20181129
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.fileSuffix = .avro

# Roll files in HDFS every 5 min or at 255MB; don't roll based on number of records
# We roll at 255MB because our block size is 128MB, we want 2 full blocks without going over
a1.sinks.k1.hdfs.rollInterval = 60
a1.sinks.k1.hdfs.rollSize = 267386880
a1.sinks.k1.hdfs.rollCount = 0
# Write to HDFS file in batches of 20 records
a1.sinks.k1.hdfs.batchSize = 20

a1.sinks.k1.serializer = org.apache.flume.sink.hdfs.AvroEventSerializer$Builder
a1.sinks.k1.serializer.schemaURL = file:///SchemaFileLocation.avsc

# following property gives generic avro event and I can read that fine using
# avro table but custom schema is where I've issues
#a1.sinks.k1.serializer = avro_event
a1.sinks.k1.serializer.compressionCodec = snappy

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 1000

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

 

Did anyone faced this issue? If so can you pls point me in right direction?

1 REPLY 1

Re: Flume Avro custom schema - Malformed data

New Contributor

I am also facing same issue..did you get any Solution..If yes please let me know.. 

Don't have an account?
Coming from Hortonworks? Activate your account here