Member since
10-18-2017
2
Posts
0
Kudos Received
0
Solutions
10-18-2017
02:32 AM
The flume agent log file was: 2017-10-18 17:01:49,742 INFO org.apache.flume.node.PollingPropertiesFileConfigurationProvider: Configuration provider starting 2017-10-18 17:01:49,760 INFO org.apache.flume.node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:/run/cloudera-scm-agent/process/2727-flume-AGENT/flume.conf 2017-10-18 17:01:49,765 INFO org.apache.flume.conf.FlumeConfiguration: Processing:sink1 2017-10-18 17:01:49,766 INFO org.apache.flume.conf.FlumeConfiguration: Processing:sink1 2017-10-18 17:01:49,766 INFO org.apache.flume.conf.FlumeConfiguration: Processing:sink1 2017-10-18 17:01:49,766 INFO org.apache.flume.conf.FlumeConfiguration: Processing:sink1 2017-10-18 17:01:49,766 INFO org.apache.flume.conf.FlumeConfiguration: Processing:sink1 2017-10-18 17:01:49,766 INFO org.apache.flume.conf.FlumeConfiguration: Processing:sink1 2017-10-18 17:01:49,766 INFO org.apache.flume.conf.FlumeConfiguration: Added sinks: sink1 Agent: tier1 2017-10-18 17:01:49,766 INFO org.apache.flume.conf.FlumeConfiguration: Processing:sink1 2017-10-18 17:01:49,783 INFO org.apache.flume.conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [tier1] 2017-10-18 17:01:49,783 INFO org.apache.flume.node.AbstractConfigurationProvider: Creating channels 2017-10-18 17:01:49,790 INFO org.apache.flume.channel.DefaultChannelFactory: Creating instance of channel channel1 type memory 2017-10-18 17:01:49,798 INFO org.apache.flume.node.AbstractConfigurationProvider: Created channel channel1 2017-10-18 17:01:49,799 INFO org.apache.flume.source.DefaultSourceFactory: Creating instance of source source1, type org.apache.flume.source.kafka.KafkaSource 2017-10-18 17:01:49,835 INFO org.apache.flume.sink.DefaultSinkFactory: Creating instance of sink: sink1, type: hdfs 2017-10-18 17:01:49,847 INFO org.apache.flume.node.AbstractConfigurationProvider: Channel channel1 connected to [source1, sink1] 2017-10-18 17:01:49,854 INFO org.apache.flume.node.Application: Starting new configuration:{ sourceRunners:{source1=PollableSourceRunner: { source:org.apache.flume.source.kafka.KafkaSource{name:source1,state:IDLE} counterGroup:{ name:null counters:{} } }} sinkRunners:{sink1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@705cdf05 counterGroup:{ name:null counters:{} } }} channels:{channel1=org.apache.flume.channel.MemoryChannel{name: channel1}} } 2017-10-18 17:01:49,855 INFO org.apache.flume.node.Application: Starting Channel channel1 2017-10-18 17:01:49,908 INFO org.apache.flume.instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: channel1: Successfully registered new MBean. 2017-10-18 17:01:49,909 INFO org.apache.flume.instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: channel1 started 2017-10-18 17:01:49,909 INFO org.apache.flume.node.Application: Starting Sink sink1 2017-10-18 17:01:49,910 INFO org.apache.flume.instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: sink1: Successfully registered new MBean. 2017-10-18 17:01:49,911 INFO org.apache.flume.instrumentation.MonitoredCounterGroup: Component type: SINK, name: sink1 started 2017-10-18 17:01:49,913 INFO org.apache.flume.node.Application: Starting Source source1 2017-10-18 17:01:49,914 INFO org.apache.flume.source.kafka.KafkaSource: Starting org.apache.flume.source.kafka.KafkaSource{name:source1,state:IDLE}... 2017-10-18 17:01:49,941 INFO org.apache.kafka.clients.consumer.ConsumerConfig: ConsumerConfig values: metric.reporters = [] metadata.max.age.ms = 300000 value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer group.id = abcd partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor] reconnect.backoff.ms = 50 sasl.kerberos.ticket.renew.window.factor = 0.8 max.partition.fetch.bytes = 1048576 bootstrap.servers = [bda03.aslsdc.local:9092, bda04.aslsdc.local:9092, bda05.aslsdc.local:9092] retry.backoff.ms = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 ssl.keystore.type = JKS ssl.trustmanager.algorithm = PKIX enable.auto.commit = false ssl.key.password = null fetch.max.wait.ms = 500 sasl.kerberos.min.time.before.relogin = 60000 connections.max.idle.ms = 540000 ssl.truststore.password = null session.timeout.ms = 30000 metrics.num.samples = 2 client.id = ssl.endpoint.identification.algorithm = null key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer ssl.protocol = TLS check.crcs = true request.timeout.ms = 40000 ssl.provider = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.keystore.location = null heartbeat.interval.ms = 3000 auto.commit.interval.ms = 5000 receive.buffer.bytes = 65536 ssl.cipher.suites = null ssl.truststore.type = JKS security.protocol = PLAINTEXT ssl.truststore.location = null ssl.keystore.password = null ssl.keymanager.algorithm = SunX509 metrics.sample.window.ms = 30000 fetch.min.bytes = 1 send.buffer.bytes = 131072 auto.offset.reset = latest 2017-10-18 17:01:49,952 INFO org.mortbay.log: Logging to org.slf4j.impl.Log4jLoggerAdapter(org.mortbay.log) via org.mortbay.log.Slf4jLog 2017-10-18 17:01:49,994 INFO org.mortbay.log: jetty-6.1.26.cloudera.4 2017-10-18 17:01:50,016 INFO org.mortbay.log: Started SelectChannelConnector@0.0.0.0:41414 2017-10-18 17:01:50,066 INFO org.apache.kafka.common.utils.AppInfoParser: Kafka version : 0.9.0-kafka-2.0.2 2017-10-18 17:01:50,066 INFO org.apache.kafka.common.utils.AppInfoParser: Kafka commitId : unknown
... View more
10-18-2017
02:31 AM
My Configuration is Cloudera Manager 5.12.1 Flume 1.7.0 Kakfa 2.2.0-1.2.2.0.p0.68 I created topic "test" in kafka, and would like to configure flume to act as consumer to fetch data from this topic and save it to HDFS. My flume configuration is ################################# tier1.sources = source1 tier1.channels = channel1 tier1.sinks = sink1 tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource tier1.sources.source1.zookeeperConnect = bda03:2182, bda04:2182,bda05:2182 tier1.sources.source1.topic = test tier1.sources.source1.groupId = flume tier1.sources.source1.channels = channel1 tier1.sources.source1.interceptors = i1 tier1.sources.source1.interceptors.i1.type = timestamp tier1.sources.source1.kafka.consumer.timeout.ms = 100 tier1.channels.channel1.type = memory tier1.channels.channel1.capacity = 10000 tier1.channels.channel1.transactionCapacity = 1000 tier1.sinks.sink1.type = hdfs tier1.sinks.sink1.hdfs.path = /user/flume/tmp/ tier1.sinks.sink1.hdfs.rollInterval = 5 tier1.sinks.sink1.hdfs.rollSize = 0 tier1.sinks.sink1.hdfs.rollCount = 0 tier1.sinks.sink1.hdfs.fileType = DataStream tier1.sinks.sink1.channel = channel1 ###################################### The configuration was just copied from https://www.cloudera.com/documentation/kafka/latest/topics/kafka_flume.html After starting the flume agent, there was no error message in flume and kafka log file. I tried to put some records to topics: $ kafka-console-producer --broker-list bda03:9092,bda04:9092,bda05:9092 --topic test Hello This is new message However, there was nothing put into HDFS. Then, I tried to retreive message from topic "test" via command line (with consumer group flume) $ kafka-console-consumer --zookeeper bda03:2182,bda04:2182,bda05:2182 --topic test --consumer-property group.id=flume The output was: Hello This is new message If the flume agent could fetch message from the topic, then the output of the above command should be empty. The result indicated that the flume agent could not get any data from Kafka. Can anyone help? Thanks!
... View more
Labels:
- Labels:
-
Apache Flume
-
Apache Kafka