Created on 09-28-2018 04:21 AM - edited 09-16-2022 06:45 AM
Hi Friends,
I need one help regarding Avro file processing using Flume and Kafka.
In short, I am reading a json file, using interceptor and selector splitting specific values into avro sink,
and then reading that avro source to write to hdfs as avro file.
The flume configurations I am using are given below.
The problem I am facing is, the avro file is getting written to hdfs with only header and body.
So by using "java -jar avro-tools-1.8.2.jar getschema" option I am not getting desired schema of the avro file.
Every time it is showing only "header" and "body" as 2 fields.
Please suggest how to resolve this problem. It is very urgent.
1.
flume-ng agent --name ta1 --conf conf --conf-file /home/cloudera/santanu/flume_interceptor_multiplexing.conf -Dflume.root.logger=DEBUG,console ## Flume Agent with Json Source , Kafka and Memory Channels , Avro Sink ta1.sources = twitter ta1.sinks = avrofile ta1.channels = memchannel kafkachannel ## Properties ## Sources ta1.sources.twitter.type = exec ta1.sources.twitter.command = tail -F /home/cloudera/workspace/Cloudera_Share/Bigdata.json ## Sinks ta1.sinks.avrofile.type = avro ta1.sinks.avrofile.hostname = 192.XXX.x.x ta1.sinks.avrofile.port = 4141 ## Channel 1 ta1.channels.memchannel.type = memory ta1.channels.memchannel.capacity = 5000 ta1.channels.memchannel.transactionCapacity = 500 ## Channel 2 ta1.channels.kafkachannel.type = org.apache.flume.channel.kafka.KafkaChannel ta1.channels.kafkachannel.kafka.bootstrap.servers = 192.XXX.x.x:9092 ta1.channels.kafkachannel.kafka.topic = MyTopic_1 ta1.channels.kafkachannel.kafka.consumer.group.id = my_group ## Interceptor ta1.sources.twitter.interceptors = i1 ta1.sources.twitter.interceptors.i1.type = regex_extractor ta1.sources.twitter.interceptors.i1.regex = (?i)(Python|Java|Scala|Perl|Sqoop|Flume|Kafka|Hive|Spark|Jethro|NoSQL) ta1.sources.twitter.interceptors.i1.serializers = s1 ta1.sources.twitter.interceptors.i1.serializers.s1.name = BigData ## Source Selector ta1.sources.twitter.selector.type = multiplexing ta1.sources.twitter.selector.header = BigData ta1.sources.twitter.selector.mapping.Python = kafkachannel ta1.sources.twitter.selector.mapping.Java = kafkachannel ta1.sources.twitter.selector.mapping.Scala = kafkachannel ta1.sources.twitter.selector.mapping.Perl = kafkachannel ta1.sources.twitter.selector.mapping.Sqoop = memchannel ta1.sources.twitter.selector.mapping.Flume = memchannel ta1.sources.twitter.selector.mapping.Kafka = memchannel ta1.sources.twitter.selector.mapping.Hive = memchannel ta1.sources.twitter.selector.mapping.Spark = memchannel ta1.sources.twitter.selector.mapping.Jethro = memchannel ta1.sources.twitter.selector.mapping.NoSQL = memchannel ## Mapping ta1.sources.twitter.channels = kafkachannel memchannel ta1.sinks.avrofile.channel = memchannel
2.
flume-ng agent --name ta2 --conf conf --conf-file /home/cloudera/santanu/flume_avro_hdfs.conf -Dflume.root.logger=DEBUG,console ## Flume Agent with Avro Source and HDFS Sink ta2.sources = avrofile ta2.sinks = hdfsfile ta2.channels = memchannel ## Properties ## Source ta2.sources.avrofile.type = avro ta2.sources.avrofile.bind = 192.XXX.x.x ta2.sources.avrofile.port = 4141 ## Sink ta2.sinks.hdfsfile.type = hdfs ta2.sinks.hdfsfile.hdfs.path = /user/cloudera/flume_avro ta2.sinks.hdfsfile.hdfs.filePrefix = Hadoop ta2.sinks.hdfsfile.hdfs.fileSuffix = .avro ta2.sinks.hdfsfile.hdfs.fileType = DataStream ta2.sinks.hdfsfile.hdfs.writeFormat = Text ta2.sinks.hdfsfile.hdfs.rollInterval = 5 ta2.sinks.hdfsfile.serializer = avro_event ta2.sinks.hdfsfile.compressionCodec = snappy ## Channel ta2.channels.memchannel.type = memory ta2.channels.memchannel.capacity = 5000 ta2.channels.memchannel.transactionCapacity = 500 ## Interceptor ta1.sources.avrofile.interceptors = i2 ta1.sources.avrofile.interceptors.i2.type = remove_header ta1.sources.avrofile.interceptors.i2.withName = BigData ## Mapping ta2.sources.avrofile.channels = memchannel ta2.sinks.hdfsfile.channel = memchannel
3. Avro File in HDFS with Header and Body as 2 columns
Thanking you
Santanu
Created 12-10-2018 12:30 PM
It seems that using avro_event serializer gives you default avro schema only.
You might want to use custom serializer instead to solve your problem.
a1.sinks.k1.serializer = org.apache.flume.sink.hdfs.AvroEventSerializer$Builder
a1.sinks.k1.serializer.schemaURL = file:///LinkToAvroSchema.avsc