Created 08-24-2017 09:39 AM
Hi All,
I'm facing issues with flume agent when i configure sink as KafkaSink. The kafkasink does not writes any messages to Kafka topic and I dont see any errors in the flume agent logs.
HDP - 2.6.1
Flume source and channels are working as expected with File Roll Sink.
Below is the flume sink configuration.
testagent2.sinks.kafkasink2.channel = memchannel2 testagent2.sinks.kafkasink2.type = org.apache.flume.sink.kafka.KafkaSink testagent2.sinks.kafkasink2.brokerList = brokerServer:portNo testagent2.sinks.kafkasink2.zookeeperConnect = ZookeeperServer:portNo testagent2.sinks.kafkasink2.topic = testtopic2 testagent2.sinks.kafkasink2.batchSize = 100 testagent2.sinks.kafkasink2.requiredAcks = 1 testagent2.sinks.kafkasink2.kafka.rebalance.max.retries = 40 testagent2.sinks.kafkasink2.kafka.rebalance.backoff.ms = 5000 testagent2.sinks.kafkasink2.kafka.zookeeper.session.timeout.ms = 6000
Created 08-24-2017 05:29 PM
Update 1:
When using only Kafka as sink with other types of source and channel, I'm able to push messages to Kafka Topic.
It fails when kafkaSink is used with KafkaSource or KafkaChannel .
Following have been tested:
1. kafkaSource, kafkaChannel, other Sinks -- Works
2. kafkaSource, kafkaChannel, kafkaSink -- Fails
3. anySource(except kafka), anychannel(except kafka), kafkaSink -- Works
4. kafkaSource, memoryChannel, kafkaSink -- Fails
5. anySource, kafkaChannel, kafkaSink -- Fails
Has anyone tried using both kafkaSource and kafkaSink in same flume agent ?
Thanks.
Created 08-29-2017 03:01 PM
@D Giri I have tried the scenario with combination of Kafka Channel and Kafka Sink with source as pstream.
Source (pstream) -> Channel (kafkaChannel) -> Sink (kafkaSink)
Please find below the configs that I have used -
agent1.sources = pstream agent1.channels = kafkaChannel agent1.sinks = kafkaSink agent1.sources.pstream.type = exec agent1.sources.pstream.channels = kafkaChannel agent1.sources.pstream.command = python print_number.py agent1.channels.kafkaChannel.type = org.apache.flume.channel.kafka.KafkaChannel agent1.channels.kafkaChannel.kafka.topic = dharmik-test agent1.channels.kafkaChannel.parseAsFlumeEvent = false agent1.channels.kafkaChannel.kafka.bootstrap.servers = <kafkaBroker:port> agent1.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink agent1.sinks.kafkaSink.channel = kafkaChannel agent1.sinks.kafkaSink.batchSize = 1 agent1.sinks.kafkaSink.topic = dharmik-test agent1.sinks.kafkaSink.requiredAcks = 1 agent1.sinks.kafkaSink.kafka.topic.metadata.refresh.interval.ms = 1000 agent1.sinks.kafkaSink.brokerList = <kafkaBroker:port>
print_number.py just prints number between 0 and 100.
You can check if the data is properly produced by the kafkaSink using the consumer cli.
sh /usr/hdp/2.6.2.0-152/kafka/bin/kafka-console-consumer.sh --new-consumer --topic dharmik-test --bootstrap-server <kafkaBroker:port>
Please give it a try and let me know if you face any issue.
Created 09-04-2017 04:25 PM
Thanks @dthakkar for your valuable response.
Will give it a try from my end.
Created 09-04-2017 04:53 PM
agent1.channels.kafkaChannel.parseAsFlumeEvent =false
above configuration made the difference.
Thanks a lot.
Created 09-04-2017 06:21 PM
@D Giri It's working for you now for various scenarios?