Created 09-30-2021 08:36 AM
We are experiencing a strange issue, where Avro files created by Flume out of the input JSON records received via Kafka appear to be corrupted when landed to HDFS via the HDFS Sink.
We are on the following platform:
Version: Cloudera Enterprise 6.3.4 (#6763501 built by jenkins on 20201028-1951 git: d5d1c716a1d40105495ed484bd6e49813617be03)
Java VM Name: Java HotSpot(TM) 64-Bit Server VM
Java Version: 1.8.0_181
I stripped down our data model to a bare minimum, which consistently exposes the same behavior. This is our sample JSON message:
{"idEnrichedEvent":"ogtpcdwkgk"}
This is our Avro schema -- we expect one field "idEnrichedEvent" to be present for the sample.
{
"type": "record",
"name": "foobar",
"fields": [
{
"name": "idEnrichedEvent",
"type": [
"null",
"string"
]
}
]
}
This is how our Flume configuration looks like:
foobar.sources = kafka-source
foobar.channels = hdfs-channel
foobar.sinks = hdfs-sink
foobar.sources.kafka-source.type = org.apache.flume.source.kafka.KafkaSource
foobar.sources.kafka-source.kafka.bootstrap.servers = XXX, XXX
foobar.sources.kafka-source.consumer.group.id = XXX
foobar.sources.kafka-source.topic = XXX
foobar.sources.kafka-source.batchSize = 100
foobar.sources.kafka-source.channels = hdfs-channel
foobar.sources.kafka-source.kafka.consumer.security.protocol = SSL
foobar.sources.kafka-source.kafka.consumer.ssl.truststore.location=XXX
foobar.sources.kafka-source.kafka.consumer.ssl.truststore.password=changeit
foobar.sources.kafka-source.kafka.consumer.ssl.keystore.location=XXX
foobar.sources.kafka-source.kafka.consumer.ssl.keystore.password=XXX
foobar.sources.kafka-source.kafka.consumer.ssl.key.password=XXX
foobar.sources.kafka-source.interceptors = hostNameInterceptor
foobar.sources.kafka-source.interceptors.hostNameInterceptor.type = host
foobar.sources.kafka-source.interceptors.hostNameInterceptor.preserveExisting = false
foobar.sources.kafka-source.interceptors.hostNameInterceptor.hostHeader = hostname
foobar.sinks.hdfs-sink.channel = hdfs-channel
foobar.sinks.hdfs-sink.type = hdfs
foobar.sinks.hdfs-sink.hdfs.fileType = DataStream
foobar.sinks.hdfs-sink.hdfs.fileSuffix = .avro
foobar.sinks.hdfs-sink.hdfs.writeFormat = Text
foobar.sinks.hdfs-sink.hdfs.path = hdfs://nameservice1/XXX/foobar/t_period_cd=%Y-%m-%d
foobar.sinks.hdfs-sink.hdfs.filePrefix = topic_data-%{hostname}
foobar.sinks.hdfs-sink.hdfs.inUsePrefix = ._
foobar.sinks.hdfs-sink.hdfs.useLocalTimeStamp = true
foobar.sinks.hdfs-sink.hdfs.kerberosPrincipal = XXX@XXX
foobar.sinks.hdfs-sink.hdfs.kerberosKeytab = /var/lib/flume-ng/XXX.keytab
foobar.sinks.hdfs-sink.hdfs.idleTimeout=600
foobar.sinks.hdfs-sink.hdfs.rollSize = 128000000
foobar.sinks.hdfs-sink.hdfs.rollCount = 1000
foobar.sinks.hdfs-sink.hdfs.rollInterval = 3600
foobar.sinks.hdfs-sink.hdfs.minBlockReplicas = 1
foobar.sinks.hdfs-sink.serializer = org.apache.flume.sink.hdfs.AvroEventSerializer$Builder
foobar.sinks.hdfs-sink.serializer.compressionCodec=snappy
foobar.sinks.hdfs-sink.serializer.schemaURL = hdfs://nameservice1/XXX/avro/foobar.avsc
foobar.channels.hdfs-channel.capacity = 1000
foobar.channels.hdfs-channel.transactionCapacity = 1000
foobar.channels.hdfs-channel.type = memory
While avro-tools correctly shows the desired schema like this:
yarn jar /opt/cloudera/parcels/CDH/jars/avro-tools-1.8.2-cdh6.3.4.jar getmeta XXXX/foobar/t_period_cd=2021-09-30/topic_data-XX.XX.XXX.XX.XXXXXXXXXXXXX.avro
WARNING: YARN_OPTS has been replaced by HADOOP_OPTS. Using value of YARN_OPTS.
avro.schema {"type":"record","name":"foobar","fields":[{"name":"idEnrichedEvent","type":["null","string"]}]}
avro.codec snappy
it fails with the following error when trying to print the data.
yarn jar /opt/cloudera/parcels/CDH/jars/avro-tools-1.8.2-cdh6.3.4.jar tojson XXXX/foobar/t_period_cd=2021-09-30/topic_data-XX.XX.XXX.XX.XXXXXXXXXXXXX.avro
WARNING: YARN_OPTS has been replaced by HADOOP_OPTS. Using value of YARN_OPTS.
Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: -62
at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:436)
at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
at org.apache.avro.file.DataFileStream.next(DataFileStream.java:220)
at org.apache.avro.tool.DataFileReadTool.run(DataFileReadTool.java:76)
at org.apache.avro.tool.Main.run(Main.java:87)
at org.apache.avro.tool.Main.main(Main.java:76)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.util.RunJar.run(RunJar.java:313)
at org.apache.hadoop.util.RunJar.main(RunJar.java:227)
Can someone point me to the right direction? Am I missing something here with the configuration or is this a bug?