Support Questions

Find answers, ask questions, and share your expertise

Flume Avro File Incorrect Schema

Explorer

Hi Friends,

 

I need one help regarding Avro file processing using Flume and Kafka.
In short, I am reading a json file, using interceptor and selector splitting specific values into avro sink,
and then reading that avro source to write to hdfs as avro file.

 

The flume configurations I am using are given below.

 

The problem I am facing is, the avro file is getting written to hdfs with only header and body.
So by using "java -jar avro-tools-1.8.2.jar getschema" option I am not getting desired schema of the avro file.
Every time it is showing only "header" and "body" as 2 fields.

 

Please suggest how to resolve this problem. It is very urgent.


1.

flume-ng agent --name ta1 --conf conf --conf-file /home/cloudera/santanu/flume_interceptor_multiplexing.conf -Dflume.root.logger=DEBUG,console

## Flume Agent with Json Source , Kafka and Memory Channels , Avro Sink
ta1.sources = twitter
ta1.sinks = avrofile
ta1.channels = memchannel kafkachannel

## Properties
## Sources
ta1.sources.twitter.type = exec
ta1.sources.twitter.command = tail -F /home/cloudera/workspace/Cloudera_Share/Bigdata.json


## Sinks
ta1.sinks.avrofile.type = avro
ta1.sinks.avrofile.hostname = 192.XXX.x.x
ta1.sinks.avrofile.port = 4141

## Channel 1
ta1.channels.memchannel.type = memory 
ta1.channels.memchannel.capacity = 5000
ta1.channels.memchannel.transactionCapacity = 500

## Channel 2
ta1.channels.kafkachannel.type = org.apache.flume.channel.kafka.KafkaChannel
ta1.channels.kafkachannel.kafka.bootstrap.servers = 192.XXX.x.x:9092
ta1.channels.kafkachannel.kafka.topic = MyTopic_1
ta1.channels.kafkachannel.kafka.consumer.group.id = my_group

## Interceptor
ta1.sources.twitter.interceptors = i1
ta1.sources.twitter.interceptors.i1.type = regex_extractor
ta1.sources.twitter.interceptors.i1.regex = (?i)(Python|Java|Scala|Perl|Sqoop|Flume|Kafka|Hive|Spark|Jethro|NoSQL)
ta1.sources.twitter.interceptors.i1.serializers = s1
ta1.sources.twitter.interceptors.i1.serializers.s1.name = BigData

## Source Selector
ta1.sources.twitter.selector.type = multiplexing
ta1.sources.twitter.selector.header = BigData
ta1.sources.twitter.selector.mapping.Python = kafkachannel
ta1.sources.twitter.selector.mapping.Java = kafkachannel
ta1.sources.twitter.selector.mapping.Scala = kafkachannel
ta1.sources.twitter.selector.mapping.Perl = kafkachannel
ta1.sources.twitter.selector.mapping.Sqoop = memchannel
ta1.sources.twitter.selector.mapping.Flume = memchannel
ta1.sources.twitter.selector.mapping.Kafka = memchannel
ta1.sources.twitter.selector.mapping.Hive = memchannel
ta1.sources.twitter.selector.mapping.Spark = memchannel
ta1.sources.twitter.selector.mapping.Jethro = memchannel
ta1.sources.twitter.selector.mapping.NoSQL = memchannel

## Mapping
ta1.sources.twitter.channels = kafkachannel memchannel
ta1.sinks.avrofile.channel = memchannel


2.

flume-ng agent --name ta2 --conf conf --conf-file /home/cloudera/santanu/flume_avro_hdfs.conf -Dflume.root.logger=DEBUG,console

## Flume Agent with Avro Source and HDFS Sink
ta2.sources = avrofile
ta2.sinks = hdfsfile
ta2.channels = memchannel

## Properties
## Source
ta2.sources.avrofile.type = avro
ta2.sources.avrofile.bind = 192.XXX.x.x
ta2.sources.avrofile.port = 4141

## Sink
ta2.sinks.hdfsfile.type = hdfs
ta2.sinks.hdfsfile.hdfs.path = /user/cloudera/flume_avro
ta2.sinks.hdfsfile.hdfs.filePrefix = Hadoop
ta2.sinks.hdfsfile.hdfs.fileSuffix = .avro
ta2.sinks.hdfsfile.hdfs.fileType = DataStream
ta2.sinks.hdfsfile.hdfs.writeFormat = Text
ta2.sinks.hdfsfile.hdfs.rollInterval = 5
ta2.sinks.hdfsfile.serializer = avro_event
ta2.sinks.hdfsfile.compressionCodec = snappy

## Channel
ta2.channels.memchannel.type = memory
ta2.channels.memchannel.capacity = 5000
ta2.channels.memchannel.transactionCapacity = 500

## Interceptor
ta1.sources.avrofile.interceptors = i2
ta1.sources.avrofile.interceptors.i2.type = remove_header
ta1.sources.avrofile.interceptors.i2.withName = BigData

## Mapping
ta2.sources.avrofile.channels = memchannel
ta2.sinks.hdfsfile.channel = memchannel

3. Avro File in HDFS with Header and Body as 2 columnsFlume_Avro_Schema_2.JPG

 

 

 

 

 

 

Thanking you
Santanu

1 REPLY 1

New Contributor

It seems that using avro_event serializer gives you default avro schema only.

 

You might want to use custom serializer instead to solve your problem.

a1.sinks.k1.serializer = org.apache.flume.sink.hdfs.AvroEventSerializer$Builder
a1.sinks.k1.serializer.schemaURL = file:///LinkToAvroSchema.avsc