Support Questions
Find answers, ask questions, and share your expertise
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Flume to elastic data transfer avro conversion

Flume to elastic data transfer avro conversion



I am trying to use flume to send data to elastic . Flume collects the logs from kafka and sinks it to elastic. log format in kafka is in avro. Data that goes to elastic has to be in json format. can anyone suggest me an existing interceptor/serializer to convert avro to elastic suitable format .

Here is what i have so far.


agent.sources.test_source.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.test_source.zookeeperConnect = prod-zk1.internal:2181/kafka
agent.sources.test_source.topic = test_elastic
agent.sources.test_source.groupId = test = 5000
agent.sources.test_source.batchDurationMillis = 1000
agent.sources.test_source.batchSize = 1000 = true
agent.sources.test_source.interceptors = i1
agent.sources.test_source.interceptors.i1.type = static
agent.sources.test_source.interceptors.i1.key = flume.avro.schema.url
agent.sources.test_source.interceptors.i1.value = hdfs://nameservice/user/flafka/avro_templates/test_elastic.avsc
agent.sources.test_source.channels = test_channel

agent.channels.test_channel.type = memory
agent.channels.test_channel.capacity = 100000
agent.channels.test_channel.transactionCapacity = 1000

agent.sinks.test_sync.type = elasticsearch
agent.sinks.test_sync.hostNames = prod-elastic.internal:9300
agent.sinks.test_sync.indexName = test
agent.sinks.test_sync.indexType = test
agent.sinks.test_sync.clusterName = prod
agent.sinks.test_sync.batchSize = 1000
agent.sinks.test_sync.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer = test_channel

Thanks in advance.

Don't have an account?
Coming from Hortonworks? Activate your account here