Reply
Explorer
Posts: 6
Registered: ‎08-08-2017

Flume to elastic data transfer avro conversion

[ Edited ]

Hello,

I am trying to use flume to send data to elastic . Flume collects the logs from kafka and sinks it to elastic. log format in kafka is in avro. Data that goes to elastic has to be in json format. can anyone suggest me an existing interceptor/serializer to convert avro to elastic suitable format .

Here is what i have so far.

agent.sources=test_source
agent.channels=test_channel
agent.sinks=test_sync

agent.sources.test_source.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.test_source.zookeeperConnect = prod-zk1.internal:2181/kafka
agent.sources.test_source.topic = test_elastic
agent.sources.test_source.groupId = test
agent.sources.test_source.consumer.timeout.ms = 5000
agent.sources.test_source.batchDurationMillis = 1000
agent.sources.test_source.batchSize = 1000
agent.sources.test_source.auto.commit.enabled = true
agent.sources.test_source.interceptors = i1
agent.sources.test_source.interceptors.i1.type = static
agent.sources.test_source.interceptors.i1.key = flume.avro.schema.url
agent.sources.test_source.interceptors.i1.value = hdfs://nameservice/user/flafka/avro_templates/test_elastic.avsc
agent.sources.test_source.channels = test_channel

agent.channels.test_channel.type = memory
agent.channels.test_channel.capacity = 100000
agent.channels.test_channel.transactionCapacity = 1000

agent.sinks.test_sync.type = elasticsearch
agent.sinks.test_sync.hostNames = prod-elastic.internal:9300
agent.sinks.test_sync.indexName = test
agent.sinks.test_sync.indexType = test
agent.sinks.test_sync.clusterName = prod
agent.sinks.test_sync.batchSize = 1000
agent.sinks.test_sync.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer
agent.sinks.test_sync.channel = test_channel

Thanks in advance.

Announcements