Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Flume Avro File Incorrect Schema

Flume Avro File Incorrect Schema

New Contributor

Hi Friends,

 

I need one help regarding Avro file processing using Flume and Kafka.
In short, I am reading a json file, using interceptor and selector splitting specific values into avro sink,
and then reading that avro source to write to hdfs as avro file.

 

The flume configurations I am using are given below.

 

The problem I am facing is, the avro file is getting written to hdfs with only header and body.
So by using "java -jar avro-tools-1.8.2.jar getschema" option I am not getting desired schema of the avro file.
Every time it is showing only "header" and "body" as 2 fields.

 

Please suggest how to resolve this problem. It is very urgent.


1.

flume-ng agent --name ta1 --conf conf --conf-file /home/cloudera/santanu/flume_interceptor_multiplexing.conf -Dflume.root.logger=DEBUG,console

## Flume Agent with Json Source , Kafka and Memory Channels , Avro Sink
ta1.sources = twitter
ta1.sinks = avrofile
ta1.channels = memchannel kafkachannel

## Properties
## Sources
ta1.sources.twitter.type = exec
ta1.sources.twitter.command = tail -F /home/cloudera/workspace/Cloudera_Share/Bigdata.json


## Sinks
ta1.sinks.avrofile.type = avro
ta1.sinks.avrofile.hostname = 192.XXX.x.x
ta1.sinks.avrofile.port = 4141

## Channel 1
ta1.channels.memchannel.type = memory 
ta1.channels.memchannel.capacity = 5000
ta1.channels.memchannel.transactionCapacity = 500

## Channel 2
ta1.channels.kafkachannel.type = org.apache.flume.channel.kafka.KafkaChannel
ta1.channels.kafkachannel.kafka.bootstrap.servers = 192.XXX.x.x:9092
ta1.channels.kafkachannel.kafka.topic = MyTopic_1
ta1.channels.kafkachannel.kafka.consumer.group.id = my_group

## Interceptor
ta1.sources.twitter.interceptors = i1
ta1.sources.twitter.interceptors.i1.type = regex_extractor
ta1.sources.twitter.interceptors.i1.regex = (?i)(Python|Java|Scala|Perl|Sqoop|Flume|Kafka|Hive|Spark|Jethro|NoSQL)
ta1.sources.twitter.interceptors.i1.serializers = s1
ta1.sources.twitter.interceptors.i1.serializers.s1.name = BigData

## Source Selector
ta1.sources.twitter.selector.type = multiplexing
ta1.sources.twitter.selector.header = BigData
ta1.sources.twitter.selector.mapping.Python = kafkachannel
ta1.sources.twitter.selector.mapping.Java = kafkachannel
ta1.sources.twitter.selector.mapping.Scala = kafkachannel
ta1.sources.twitter.selector.mapping.Perl = kafkachannel
ta1.sources.twitter.selector.mapping.Sqoop = memchannel
ta1.sources.twitter.selector.mapping.Flume = memchannel
ta1.sources.twitter.selector.mapping.Kafka = memchannel
ta1.sources.twitter.selector.mapping.Hive = memchannel
ta1.sources.twitter.selector.mapping.Spark = memchannel
ta1.sources.twitter.selector.mapping.Jethro = memchannel
ta1.sources.twitter.selector.mapping.NoSQL = memchannel

## Mapping
ta1.sources.twitter.channels = kafkachannel memchannel
ta1.sinks.avrofile.channel = memchannel


2.

flume-ng agent --name ta2 --conf conf --conf-file /home/cloudera/santanu/flume_avro_hdfs.conf -Dflume.root.logger=DEBUG,console

## Flume Agent with Avro Source and HDFS Sink
ta2.sources = avrofile
ta2.sinks = hdfsfile
ta2.channels = memchannel

## Properties
## Source
ta2.sources.avrofile.type = avro
ta2.sources.avrofile.bind = 192.XXX.x.x
ta2.sources.avrofile.port = 4141

## Sink
ta2.sinks.hdfsfile.type = hdfs
ta2.sinks.hdfsfile.hdfs.path = /user/cloudera/flume_avro
ta2.sinks.hdfsfile.hdfs.filePrefix = Hadoop
ta2.sinks.hdfsfile.hdfs.fileSuffix = .avro
ta2.sinks.hdfsfile.hdfs.fileType = DataStream
ta2.sinks.hdfsfile.hdfs.writeFormat = Text
ta2.sinks.hdfsfile.hdfs.rollInterval = 5
ta2.sinks.hdfsfile.serializer = avro_event
ta2.sinks.hdfsfile.compressionCodec = snappy

## Channel
ta2.channels.memchannel.type = memory
ta2.channels.memchannel.capacity = 5000
ta2.channels.memchannel.transactionCapacity = 500

## Interceptor
ta1.sources.avrofile.interceptors = i2
ta1.sources.avrofile.interceptors.i2.type = remove_header
ta1.sources.avrofile.interceptors.i2.withName = BigData

## Mapping
ta2.sources.avrofile.channels = memchannel
ta2.sinks.hdfsfile.channel = memchannel

3. Avro File in HDFS with Header and Body as 2 columnsFlume_Avro_Schema_2.JPG

 

 

 

 

 

 

Thanking you
Santanu

1 REPLY 1
Highlighted

Re: Flume Avro File Incorrect Schema

New Contributor

It seems that using avro_event serializer gives you default avro schema only.

 

You might want to use custom serializer instead to solve your problem.

a1.sinks.k1.serializer = org.apache.flume.sink.hdfs.AvroEventSerializer$Builder
a1.sinks.k1.serializer.schemaURL = file:///LinkToAvroSchema.avsc
Don't have an account?
Coming from Hortonworks? Activate your account here