Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Can SchemaURL be configured in flume conf rather than passing in header.

Highlighted

Can SchemaURL be configured in flume conf rather than passing in header.

New Contributor

I have avro data coming to kafka topic. Flume reads the events from kafka and then using kite dataset with hdfs sink is put into HDFS as parquet data.

Flume config is as below: agent.sinks.k1.channel = c1 agent.sinks.k1.type = org.apache.flume.sink.kite.DatasetSink agent.sinks.k1.kite.dataset.uri = dataset:hdfs://namenodeHA/kite/avro_to_parquet_item2 agent.sinks.k1.serializer = org.apache.flume.sink.hdfs.AvroEventSerializer$Builder agent.sinks.k1.hdfs.filePrefix=parquetdata agent.sinks.k1.hdfs.fileSuffix = .parquet agent.sinks.k1.hdfs.fileType=DataStream #agent.sinks.k1.hdfs.rollInterval=30 #agent.sinks.k1.hdfs.rollCount=1 #agent.sinks.k1.hdfs.batchSize=1 agent.sinks.k1.kite.batchSize=2 agent.sinks.k1.kite.rollInterval=30 agent.sinks.k1.kite.flushable.commitOnBatch=true #agent.sinks.k1.hdfs.path = hdfs://namenodeHA/user/flumetest #agent.sinks.k1.serializer.compressionCodec = snappy agent.sinks.k1.serializer.schemaURL = hdfs://namenodeHA/kite/item.avsc

I am getting the below exception in the flume logs: 2017-07-31 06:18:40,796 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.hadoop.io.compress.CodecPool.getCompressor(CodecPool.java:153)] Got brand-new compressor [.snappy] 2017-07-31 06:18:40,802 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.kitesdk.data.spi.filesystem.FileSystemWriter.initialize(FileSystemWriter.java:147)] Opened output appender ParquetAppender{path=hdfs://namenodeHA/kite/avro_to_parquet_item2/.6d1019b3-96c4-4334-b737-af260d17aac4.parquet.tmp, schema={"type":"record","name":"item","namespace":"item.avro","fields":[{"name":"i_item_sk","type":..................................

{"name":"i_manager_id","type":["null","int"]}

,{"name":"i_product_name","type":["null","string"]}

]}, fileSystem=DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_2077692400_17, ugi=root (auth:SIMPLE)]], avroParquetWriter=parquet.avro.AvroParquetWriter@31ffba30} for hdfs://namenodeHA/kite/avro_to_parquet_item2/6d1019b3-96c4-4334-b737-af260d17aac4.parquet 2017-07-31 06:18:40,803 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.sink.kite.policy.RetryPolicy.handle(RetryPolicy.java:39)] Event delivery failed: No schema in event headers. Headers must include either flume.avro.schema.url or flume.avro.schema.literal 2017-07-31 06:18:40,803 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:158)] Unable to deliver event. Exception follows. org.apache.flume.EventDeliveryException: org.apache.flume.sink.kite.NonRecoverableEventException: No schema in event headers. Headers must include either flume.avro.schema.url or flume.avro.schema.literal at org.apache.flume.sink.kite.policy.RetryPolicy.handle(RetryPolicy.java:42) at org.apache.flume.sink.kite.DatasetSink.write(DatasetSink.java:375) at org.apache.flume.sink.kite.DatasetSink.process(DatasetSink.java:301) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.flume.sink.kite.NonRecoverableEventException: No schema in event headers. Headers must include either flume.avro.schema.url or flume.avro.schema.literal at org.apache.flume.sink.kite.parser.AvroParser.schema(AvroParser.java:185) at org.apache.flume.sink.kite.parser.AvroParser.parse(AvroParser.java:155) at org.apache.flume.sink.kite.parser.AvroParser.parse(AvroParser.java:56) at org.apache.flume.sink.kite.DatasetSink.write(DatasetSink.java:366)