Created on 10-18-2017 02:31 AM - edited 09-16-2022 05:24 AM
My Configuration is
Cloudera Manager 5.12.1
Flume 1.7.0
Kakfa 2.2.0-1.2.2.0.p0.68
I created topic "test" in kafka, and would like to configure flume to act as consumer to fetch data from this topic and save it to HDFS.
My flume configuration is
#################################
tier1.sources = source1
tier1.channels = channel1
tier1.sinks = sink1
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.zookeeperConnect = bda03:2182, bda04:2182,bda05:2182
tier1.sources.source1.topic = test
tier1.sources.source1.groupId = flume
tier1.sources.source1.channels = channel1
tier1.sources.source1.interceptors = i1
tier1.sources.source1.interceptors.i1.type = timestamp
tier1.sources.source1.kafka.consumer.timeout.ms = 100
tier1.channels.channel1.type = memory
tier1.channels.channel1.capacity = 10000
tier1.channels.channel1.transactionCapacity = 1000
tier1.sinks.sink1.type = hdfs
tier1.sinks.sink1.hdfs.path = /user/flume/tmp/
tier1.sinks.sink1.hdfs.rollInterval = 5
tier1.sinks.sink1.hdfs.rollSize = 0
tier1.sinks.sink1.hdfs.rollCount = 0
tier1.sinks.sink1.hdfs.fileType = DataStream
tier1.sinks.sink1.channel = channel1
######################################
The configuration was just copied from https://www.cloudera.com/documentation/kafka/latest/topics/kafka_flume.html
After starting the flume agent, there was no error message in flume and kafka log file.
I tried to put some records to topics:
$ kafka-console-producer --broker-list bda03:9092,bda04:9092,bda05:9092 --topic test
Hello
This is new message
However, there was nothing put into HDFS.
Then, I tried to retreive message from topic "test" via command line (with consumer group flume)
$ kafka-console-consumer --zookeeper bda03:2182,bda04:2182,bda05:2182 --topic test --consumer-property group.id=flume
The output was:
Hello
This is new message
If the flume agent could fetch message from the topic, then the output of the above command should be empty.
The result indicated that the flume agent could not get any data from Kafka.
Can anyone help?
Thanks!
Created 02-27-2019 11:29 AM
Created 01-16-2018 07:04 AM
Created 01-16-2018 10:06 AM
Created 03-16-2018 02:37 AM
hi All,
I have been trying to ingest data from kafka source to flume. I am using cloudera quickstart vm 5.13.
My flume config file is this :
agent1.sources = kafka-source
agent1.channels = memory-channel
agent1.sinks = hdfs-sink
agent1.sources.kafka-source.type = org.apache.flume.source.kafka.KafkaSource
agent1.sources.kafka-source.kafka.bootstrap.servers = quickstart.cloudera:9092
agent1.sources.kafka-source.kafka.topics = Airports
agent1.sources.kafka-source.kafka.consumer.groupId = flume
agent1.sources.kafka-source-kafka.batchSize = 1
agent1.sources.kafka-source.channels = memory-channel
agent1.sources.kafka-source.interceptors = i1
agent1.sources.kafka-source.interceptors.i1.type = timestamp
agent1.sources.kafka-source.interceptors.i1.preserveExisting = false
agent1.sources.kafka-source.interceptors.i1.hostHeader = key
agent1.sources.kafka-source.kafka.consumer.timeout.ms = 100
agent1.sources.kafka-source.kafka.enable.auto.commit = true
agent1.channels.memory-channel.type = memory
agent1.channels.memory-channel.capacity = 10000
agent1.channels.memory-channel.transactionCapacity = 1000
agent1.sinks.hdfs-sink.type = hdfs
agent1.sinks.hdfs-sink.hdfs.path = hdfs://quickstart.cloudera:8020/tmp/kafka/
agent1.sinks.hdfs-sink.hdfs.rollInterval = 5
agent1.sinks.hdfs-sink.hdfs.rollSize = 1
agent1.sinks.hdfs-sink.hdfs.rollCount = 0
agent1.sinks.hdfs-sink.hdfs.fileType = DataStream
agent1.sinks.hdfs-sink.channel = memory-channel
The flume command I ran is :
./bin/flume-ng agent --conf conf -conf-file conf/flumekafka.conf --name agent1 -Dflume.root.logger=INFO,console
in /usr/lib/flume-ng location.
The kafka producer command and log is as below:
[cloudera@quickstart Desktop]$ /usr/bin/kafka-console-producer --broker-list quickstart.cloudera:9092 --topic Airports
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/KAFKA-3.0.0-1.3.0.0.p0.40/lib/kafka/libs/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/KAFKA-3.0.0-1.3.0.0.p0.40/lib/kafka/libs/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
18/03/16 02:07:09 INFO producer.ProducerConfig: ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [quickstart.cloudera:9092]
buffer.memory = 33554432
client.id = console-producer
compression.type = none
connections.max.idle.ms = 540000
enable.idempotence = false
interceptor.classes = null
key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
linger.ms = 1000
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 1500
retries = 3
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 102400
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
18/03/16 02:07:10 INFO utils.AppInfoParser: Kafka version : 0.11.0-kafka-3.0.0
18/03/16 02:07:10 INFO utils.AppInfoParser: Kafka commitId : unknown
>hi
>this is a new message
>
The flume command and log is as follows :
[cloudera@quickstart flume-ng]$ ./bin/flume-ng agent --conf conf -conf-file conf/flumekafka.conf --name agent1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/usr/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/usr/bin/hbase) for HBASE access
Info: Including Hive libraries found via () for Hive access
+ exec /usr/java/jdk1.7.0_67-cloudera/bin/java -Xmx20m -Dflume.root.logger=INFO,console -cp '/usr/lib/flume-ng/conf:/usr/lib/flume-ng/lib/*:/etc/hadoop/conf:/usr/lib/hadoop/lib/*:/usr/lib/hadoop/.//*:/usr/lib/hadoop-hdfs/./:/usr/lib/hadoop-hdfs/lib/*:/usr/lib/hadoop-hdfs/.//*:/usr/lib/hadoop-yarn/lib/*:/usr/lib/hadoop-yarn/.//*:/opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/lib/*:/opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/.//*:/usr/lib/hbase/bin/../conf:/usr/java/jdk1.7.0_67-cloudera/lib/tools.jar:/usr/lib/hbase/bin/..:/usr/lib/hbase/bin/../lib/activation-1.1.jar:/usr/lib/hbase/bin/../lib/apacheds-i18n-2.0.0-M15.jar:/usr/lib/hbase/bin/../lib/apacheds-kerberos-codec-2.0.0-M15.jar:/usr/lib/hbase/bin/../lib/api-asn1-api-1.0.0-M20.jar:/usr/lib/hbase/bin/../lib/api-util-1.0.0-M20.jar:/usr/lib/hbase/bin/../lib/asm-3.2.jar:/usr/lib/hbase/bin/../lib/avro.jar:/usr/lib/hbase/bin/../lib/aws-java-sdk-bundle-1.11.134.jar:/usr/lib/hbase/bin/../lib/commons-beanutils-1.9.2.jar:/usr/lib/hbase/bin/../lib/commons-beanutils-core-1.8.0.jar:/usr/lib/hbase/bin/../lib/commons-cli-1.2.jar:/usr/lib/hbase/bin/../lib/commons-codec-1.9.jar:/usr/lib/hbase/bin/../lib/commons-collections-3.2.2.jar:/usr/lib/hbase/bin/../lib/commons-compress-1.4.1.jar:/usr/lib/hbase/bin/../lib/commons-configuration-1.6.jar:/usr/lib/hbase/bin/../lib/commons-daemon-1.0.13.jar:/usr/lib/hbase/bin/../lib/commons-digester-1.8.jar:/usr/lib/hbase/bin/../lib/commons-el-1.0.jar:/usr/lib/hbase/bin/../lib/commons-httpclient-3.1.jar:/usr/lib/hbase/bin/../lib/commons-io-2.4.jar:/usr/lib/hbase/bin/../lib/commons-lang-2.6.jar:/usr/lib/hbase/bin/../lib/commons-logging-1.2.jar:/usr/lib/hbase/bin/../lib/commons-math-2.1.jar:/usr/lib/hbase/bin/../lib/commons-math3-3.1.1.jar:/usr/lib/hbase/bin/../lib/commons-net-3.1.jar:/usr/lib/hbase/bin/../lib/core-3.1.1.jar:/usr/lib/hbase/bin/../lib/curator-client-2.7.1.jar:/usr/lib/hbase/bin/../lib/curator-framework-2.7.1.jar:/usr/lib/hbase/bin/../lib/curator-recipes-2.7.1.jar:/usr/lib/hbase/bin/../lib/disruptor-3.3.0.jar:/usr/lib/hbase/bin/../lib/findbugs-annotations-1.3.9-1.jar:/usr/lib/hbase/bin/../lib/gson-2.2.4.jar:/usr/lib/hbase/bin/../lib/guava-12.0.1.jar:/usr/lib/hbase/bin/../lib/hamcrest-core-1.3.jar:/usr/lib/hbase/bin/../lib/hbase-annotations-1.2.0-cdh5.13.0.jar:/usr/lib/hbase/bin/../lib/hbase-annotations-1.2.0-cdh5.13.0-tests.jar:/usr/lib/hbase/bin/../lib/hbase-client-1.2.0-cdh5.13.0.jar:/usr/lib/hbase/bin/../lib/hbase-common-1.2.0-cdh5.13.0.jar:/usr/lib/hbase/bin/../lib/hbase-common-1.2.0-cdh5.13.0-tests.jar:/usr/lib/hbase/bin/../lib/hbase-examples-1.2.0-cdh5.13.0.jar:/usr/lib/hbase/bin/../lib/hbase-external-blockcache-1.2.0-cdh5.13.0.jar:/usr/lib/hbase/bin/../lib/hbase-hadoop2-compat-1.2.0-cdh5.13.0.jar:/usr/lib/hbase/bin/../lib/hbase-hadoop2-compat-1.2.0-cdh5.13.0-tests.jar:/usr/lib/hbase/bin/../lib/hbase-hadoop-compat-1.2.0-cdh5.13.0.jar:/usr/lib/hbase/bin/../lib/hbase-hadoop-compat-1.2.0-cdh5.13.0-tests.jar:/usr/lib/hbase/bin/../lib/hbase-it-1.2.0-cdh5.13.0.jar:/usr/lib/hbase/bin/../lib/hbase-it-1.2.0-cdh5.13.0-tests.jar:/usr/lib/hbase/bin/../lib/hbase-prefix-tree-1.2.0-cdh5.13.0.jar:/usr/lib/hbase/bin/../lib/hbase-procedure-1.2.0-cdh5.13.0.jar:/usr/lib/hbase/bin/../lib/hbase-protocol-1.2.0-cdh5.13.0.jar:/usr/lib/hbase/bin/../lib/hbase-resource-bundle-1.2.0-cdh5.13.0.jar:/usr/lib/hbase/bin/../lib/hbase-rest-1.2.0-cdh5.13.0.jar:/usr/lib/hbase/bin/../lib/hbase-rsgroup-1.2.0-cdh5.13.0.jar:/usr/lib/hbase/bin/../lib/hbase-rsgroup-1.2.0-cdh5.13.0-tests.jar:/usr/lib/hbase/bin/../lib/hbase-server-1.2.0-cdh5.13.0.jar:/usr/lib/hbase/bin/../lib/hbase-server-1.2.0-cdh5.13.0-tests.jar:/usr/lib/hbase/bin/../lib/hbase-shell-1.2.0-cdh5.13.0.jar:/usr/lib/hbase/bin/../lib/hbase-spark-1.2.0-cdh5.13.0.jar:/usr/lib/hbase/bin/../lib/hbase-thrift-1.2.0-cdh5.13.0.jar:/usr/lib/hbase/bin/../lib/high-scale-lib-1.1.1.jar:/usr/lib/hbase/bin/../lib/hsqldb-1.8.0.10.jar:/usr/lib/hbase/bin/../lib/htrace-core-3.2.0-incubating.jar:/usr/lib/hbase/bin/../lib/htrace-core4-4.0.1-incubating.jar:/usr/lib/hbase/bin/../lib/htrace-core.jar:/usr/lib/hbase/bin/../lib/httpclient-4.2.5.jar:/usr/lib/hbase/bin/../lib/httpcore-4.2.5.jar:/usr/lib/hbase/bin/../lib/jackson-annotations-2.2.3.jar:/usr/lib/hbase/bin/../lib/jackson-core-2.2.3.jar:/usr/lib/hbase/bin/../lib/jackson-core-asl-1.8.8.jar:/usr/lib/hbase/bin/../lib/jackson-databind-2.2.3.jar:/usr/lib/hbase/bin/../lib/jackson-jaxrs-1.8.8.jar:/usr/lib/hbase/bin/../lib/jackson-mapper-asl-1.8.8.jar:/usr/lib/hbase/bin/../lib/jackson-xc-1.8.8.jar:/usr/lib/hbase/bin/../lib/jamon-runtime-2.4.1.jar:/usr/lib/hbase/bin/../lib/jasper-compiler-5.5.23.jar:/usr/lib/hbase/bin/../lib/jasper-runtime-5.5.23.jar:/usr/lib/hbase/bin/../lib/java-xmlbuilder-0.4.jar:/usr/lib/hbase/bin/../lib/jaxb-api-2.1.jar:/usr/lib/hbase/bin/../lib/jaxb-impl-2.2.3-1.jar:/usr/lib/hbase/bin/../lib/jcodings-1.0.8.jar:/usr/lib/hbase/bin/../lib/jersey-client-1.9.jar:/usr/lib/hbase/bin/../lib/jersey-core-1.9.jar:/usr/lib/hbase/bin/../lib/jersey-json-1.9.jar:/usr/lib/hbase/bin/../lib/jersey-server-1.9.jar:/usr/lib/hbase/bin/../lib/jets3t-0.9.0.jar:/usr/lib/hbase/bin/../lib/jettison-1.3.3.jar:/usr/lib/hbase/bin/../lib/jetty-6.1.26.cloudera.4.jar:/usr/lib/hbase/bin/../lib/jetty-sslengine-6.1.26.cloudera.4.jar:/usr/lib/hbase/bin/../lib/jetty-util-6.1.26.cloudera.4.jar:/usr/lib/hbase/bin/../lib/joni-2.1.2.jar:/usr/lib/hbase/bin/../lib/jruby-cloudera-1.0.0.jar:/usr/lib/hbase/bin/../lib/jsch-0.1.42.jar:/usr/lib/hbase/bin/../lib/jsp-2.1-6.1.14.jar:/usr/lib/hbase/bin/../lib/jsp-api-2.1-6.1.14.jar:/usr/lib/hbase/bin/../lib/jsp-api-2.1.jar:/usr/lib/hbase/bin/../lib/junit-4.12.jar:/usr/lib/hbase/bin/../lib/leveldbjni-all-1.8.jar:/usr/lib/hbase/bin/../lib/libthrift-0.9.3.jar:/usr/lib/hbase/bin/../lib/log4j-1.2.17.jar:/usr/lib/hbase/bin/../lib/metrics-core-2.2.0.jar:/usr/lib/hbase/bin/../lib/netty-all-4.0.23.Final.jar:/usr/lib/hbase/bin/../lib/paranamer-2.3.jar:/usr/lib/hbase/bin/../lib/protobuf-java-2.5.0.jar:/usr/lib/hbase/bin/../lib/servlet-api-2.5-6.1.14.jar:/usr/lib/hbase/bin/../lib/servlet-api-2.5.jar:/usr/lib/hbase/bin/../lib/slf4j-api-1.7.5.jar:/usr/lib/hbase/bin/../lib/slf4j-log4j12.jar:/usr/lib/hbase/bin/../lib/snappy-java-1.0.4.1.jar:/usr/lib/hbase/bin/../lib/spymemcached-2.11.6.jar:/usr/lib/hbase/bin/../lib/xmlenc-0.52.jar:/usr/lib/hbase/bin/../lib/xz-1.0.jar:/usr/lib/hbase/bin/../lib/zookeeper.jar:/etc/hadoop/conf:/usr/lib/hadoop/lib/*:/usr/lib/hadoop/.//*:/usr/lib/hadoop-hdfs/./:/usr/lib/hadoop-hdfs/lib/*:/usr/lib/hadoop-hdfs/.//*:/usr/lib/hadoop-yarn/lib/*:/usr/lib/hadoop-yarn/.//*:/opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/lib/*:/opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/.//*:/etc/hadoop/conf/usr/lib/hadoop/*:/usr/lib/hadoop/lib/*:/usr/lib/zookeeper/*:/usr/lib/zookeeper/lib/*::/conf:/lib/*:/usr/lib/flume-ng/../search/lib/antlr-2.7.7.jar:/usr/lib/flume-ng/../search/lib/antlr-runtime-3.4.jar:/usr/lib/flume-ng/../search/lib/apacheds-kerberos-codec-2.0.0-M15.jar:/usr/lib/flume-ng/../search/lib/apache-mime4j-core-0.7.2.jar:/usr/lib/flume-ng/../search/lib/apache-mime4j-dom-0.7.2.jar:/usr/lib/flume-ng/../search/lib/api-asn1-api-1.0.0-M20.jar:/usr/lib/flume-ng/../search/lib/api-util-1.0.0-M20.jar:/usr/lib/flume-ng/../search/lib/argparse4j-0.4.3.jar:/usr/lib/flume-ng/../search/lib/asm-3.2.jar:/usr/lib/flume-ng/../search/lib/asm-4.1.jar:/usr/lib/flume-ng/../search/lib/asm-commons-4.1.jar:/usr/lib/flume-ng/../search/lib/asm-debug-all-4.1.jar:/usr/lib/flume-ng/../search/lib/aspectjrt-1.6.5.jar:/usr/lib/flume-ng/../search/lib/avro.jar:/usr/lib/flume-ng/../search/lib/bcmail-jdk15-1.45.jar:/usr/lib/flume-ng/../search/lib/bcprov-jdk15-1.45.jar:/usr/lib/flume-ng/../search/lib/boilerpipe-1.1.0.jar:/usr/lib/flume-ng/../search/lib/commons-cli-1.2.jar:/usr/lib/flume-ng/../search/lib/commons-codec-1.4.jar:/usr/lib/flume-ng/../search/lib/commons-collections-3.2.2.jar:/usr/lib/flume-ng/../search/lib/commons-compress-1.4.1.jar:/usr/lib/flume-ng/../search/lib/commons-configuration-1.6.jar:/usr/lib/flume-ng/../search/lib/commons-el-1.0.jar:/usr/lib/flume-ng/../search/lib/commons-fileupload-1.3.2.jar:/usr/lib/flume-ng/../search/lib/commons-io-2.4.jar:/usr/lib/flume-ng/../search/lib/commons-lang-2.6.jar:/usr/lib/flume-ng/../search/lib/commons-logging-1.1.3.jar:/usr/lib/flume-ng/../search/lib/commons-math3-3.1.1.jar:/usr/lib/flume-ng/../search/lib/commons-net-3.1.jar:/usr/lib/flume-ng/../search/lib/concurrentlinkedhashmap-lru-1.2.jar:/usr/lib/flume-ng/../search/lib/config-1.0.2.jar:/usr/lib/flume-ng/../search/lib/curator-client-2.7.1.jar:/usr/lib/flume-ng/../search/lib/curator-framework-2.7.1.jar:/usr/lib/flume-ng/../search/lib/curator-recipes-2.7.1.jar:/usr/lib/flume-ng/../search/lib/dom4j-1.6.1.jar:/usr/lib/flume-ng/../search/lib/fastutil-6.3.jar:/usr/lib/flume-ng/../search/lib/fontbox-1.8.4.jar:/usr/lib/flume-ng/../search/lib/gson-2.2.4.jar:/usr/lib/flume-ng/../search/lib/guava-11.0.2.jar:/usr/lib/flume-ng/../search/lib/hadoop-annotations.jar:/usr/lib/flume-ng/../search/lib/hadoop-auth.jar:/usr/lib/flume-ng/../search/lib/hadoop-common.jar:/usr/lib/flume-ng/../search/lib/hadoop-hdfs.jar:/usr/lib/flume-ng/../search/lib/hppc-0.5.2.jar:/usr/lib/flume-ng/../search/lib/htrace-core-3.2.0-incubating.jar:/usr/lib/flume-ng/../search/lib/htrace-core4-4.0.1-incubating.jar:/usr/lib/flume-ng/../search/lib/httpclient-4.2.5.jar:/usr/lib/flume-ng/../search/lib/httpcore-4.2.5.jar:/usr/lib/flume-ng/../search/lib/httpmime-4.2.5.jar:/usr/lib/flume-ng/../search/lib/isoparser-1.0-RC-1.jar:/usr/lib/flume-ng/../search/lib/jackson-annotations-2.3.0.jar:/usr/lib/flume-ng/../search/lib/jackson-core-2.3.1.jar:/usr/lib/flume-ng/../search/lib/jackson-core-asl-1.8.8.jar:/usr/lib/flume-ng/../search/lib/jackson-databind-2.3.1.jar:/usr/lib/flume-ng/../search/lib/jackson-mapper-asl-1.8.8.jar:/usr/lib/flume-ng/../search/lib/javax.servlet-3.0.0.v201112011016.jar:/usr/lib/flume-ng/../search/lib/jcl-over-slf4j-1.7.5.jar:/usr/lib/flume-ng/../search/lib/jdom-1.0.jar:/usr/lib/flume-ng/../search/lib/jempbox-1.8.4.jar:/usr/lib/flume-ng/../search/lib/jersey-core-1.9.jar:/usr/lib/flume-ng/../search/lib/jersey-server-1.9.jar:/usr/lib/flume-ng/../search/lib/jetty-continuation-8.1.10.v20130312.jar:/usr/lib/flume-ng/../search/lib/jetty-deploy-8.1.10.v20130312.jar:/usr/lib/flume-ng/../search/lib/jetty-http-8.1.10.v20130312.jar:/usr/lib/flume-ng/../search/lib/jetty-io-8.1.10.v20130312.jar:/usr/lib/flume-ng/../search/lib/jetty-jmx-8.1.10.v20130312.jar:/usr/lib/flume-ng/../search/lib/jetty-security-8.1.10.v20130312.jar:/usr/lib/flume-ng/../search/lib/jetty-server-8.1.10.v20130312.jar:/usr/lib/flume-ng/../search/lib/jetty-servlet-8.1.10.v20130312.jar:/usr/lib/flume-ng/../search/lib/jetty-util-8.1.10.v20130312.jar:/usr/lib/flume-ng/../search/lib/jetty-webapp-8.1.10.v20130312.jar:/usr/lib/flume-ng/../search/lib/jetty-xml-8.1.10.v20130312.jar:/usr/lib/flume-ng/../search/lib/jhighlight-1.0.jar:/usr/lib/flume-ng/../search/lib/joda-time-1.6.jar:/usr/lib/flume-ng/../search/lib/jsch-0.1.42.jar:/usr/lib/flume-ng/../search/lib/jsr305-1.3.9.jar:/usr/lib/flume-ng/../search/lib/juniversalchardet-1.0.3.jar:/usr/lib/flume-ng/../search/lib/kite-hadoop-compatibility.jar:/usr/lib/flume-ng/../search/lib/kite-morphlines-avro.jar:/usr/lib/flume-ng/../search/lib/kite-morphlines-core.jar:/usr/lib/flume-ng/../search/lib/kite-morphlines-hadoop-core.jar:/usr/lib/flume-ng/../search/lib/kite-morphlines-hadoop-parquet-avro.jar:/usr/lib/flume-ng/../search/lib/kite-morphlines-hadoop-rcfile.jar:/usr/lib/flume-ng/../search/lib/kite-morphlines-hadoop-sequencefile.jar:/usr/lib/flume-ng/../search/lib/kite-morphlines-json.jar:/usr/lib/flume-ng/../search/lib/kite-morphlines-maxmind.jar:/usr/lib/flume-ng/../search/lib/kite-morphlines-metrics-servlets.jar:/usr/lib/flume-ng/../search/lib/kite-morphlines-saxon.jar:/usr/lib/flume-ng/../search/lib/kite-morphlines-solr-cell.jar:/usr/lib/flume-ng/../search/lib/kite-morphlines-solr-core.jar:/usr/lib/flume-ng/../search/lib/kite-morphlines-tika-core.jar:/usr/lib/flume-ng/../search/lib/kite-morphlines-tika-decompress.jar:/usr/lib/flume-ng/../search/lib/kite-morphlines-twitter.jar:/usr/lib/flume-ng/../search/lib/kite-morphlines-useragent.jar:/usr/lib/flume-ng/../search/lib/leveldbjni-all-1.8.jar:/usr/lib/flume-ng/../search/lib/log4j-1.2.17.jar:/usr/lib/flume-ng/../search/lib/lucene-analyzers-common.jar:/usr/lib/flume-ng/../search/lib/lucene-analyzers-kuromoji.jar:/usr/lib/flume-ng/../search/lib/lucene-analyzers-phonetic.jar:/usr/lib/flume-ng/../search/lib/lucene-codecs.jar:/usr/lib/flume-ng/../search/lib/lucene-core.jar:/usr/lib/flume-ng/../search/lib/lucene-expressions.jar:/usr/lib/flume-ng/../search/lib/lucene-grouping.jar:/usr/lib/flume-ng/../search/lib/lucene-highlighter.jar:/usr/lib/flume-ng/../search/lib/lucene-join.jar:/usr/lib/flume-ng/../search/lib/lucene-memory.jar:/usr/lib/flume-ng/../search/lib/lucene-misc.jar:/usr/lib/flume-ng/../search/lib/lucene-queries.jar:/usr/lib/flume-ng/../search/lib/lucene-queryparser.jar:/usr/lib/flume-ng/../search/lib/lucene-spatial.jar:/usr/lib/flume-ng/../search/lib/lucene-suggest.jar:/usr/lib/flume-ng/../search/lib/maxmind-db-1.0.0.jar:/usr/lib/flume-ng/../search/lib/metadata-extractor-2.6.2.jar:/usr/lib/flume-ng/../search/lib/metrics-core-3.0.2.jar:/usr/lib/flume-ng/../search/lib/metrics-healthchecks-3.0.2.jar:/usr/lib/flume-ng/../search/lib/metrics-json-3.0.2.jar:/usr/lib/flume-ng/../search/lib/metrics-jvm-3.0.2.jar:/usr/lib/flume-ng/../search/lib/metrics-servlets-3.0.2.jar:/usr/lib/flume-ng/../search/lib/netcdf-4.2-min.jar:/usr/lib/flume-ng/../search/lib/netty-3.10.5.Final.jar:/usr/lib/flume-ng/../search/lib/netty-all-4.0.23.Final.jar:/usr/lib/flume-ng/../search/lib/noggit-0.5.jar:/usr/lib/flume-ng/../search/lib/org.restlet-2.1.4.jar:/usr/lib/flume-ng/../search/lib/org.restlet.ext.servlet-2.1.4.jar:/usr/lib/flume-ng/../search/lib/paranamer-2.3.jar:/usr/lib/flume-ng/../search/lib/parquet-avro.jar:/usr/lib/flume-ng/../search/lib/parquet-column.jar:/usr/lib/flume-ng/../search/lib/parquet-common.jar:/usr/lib/flume-ng/../search/lib/parquet-encoding.jar:/usr/lib/flume-ng/../search/lib/parquet-format.jar:/usr/lib/flume-ng/../search/lib/parquet-hadoop.jar:/usr/lib/flume-ng/../search/lib/parquet-jackson.jar:/usr/lib/flume-ng/../search/lib/pdfbox-1.8.4.jar:/usr/lib/flume-ng/../search/lib/poi-3.10-beta2.jar:/usr/lib/flume-ng/../search/lib/poi-ooxml-3.10-beta2.jar:/usr/lib/flume-ng/../search/lib/poi-ooxml-schemas-3.10.1.jar:/usr/lib/flume-ng/../search/lib/poi-scratchpad-3.10-beta2.jar:/usr/lib/flume-ng/../search/lib/protobuf-java-2.5.0.jar:/usr/lib/flume-ng/../search/lib/rome-0.9.jar:/usr/lib/flume-ng/../search/lib/Saxon-HE-9.5.1-5.jar:/usr/lib/flume-ng/../search/lib/snakeyaml-1.10.jar:/usr/lib/flume-ng/../search/lib/snappy-java-1.0.4.1.jar:/usr/lib/flume-ng/../search/lib/solr-cell.jar:/usr/lib/flume-ng/../search/lib/solr-core.jar:/usr/lib/flume-ng/../search/lib/solr-solrj.jar:/usr/lib/flume-ng/../search/lib/spatial4j-0.4.1.jar:/usr/lib/flume-ng/../search/lib/tagsoup-1.2.1.jar:/usr/lib/flume-ng/../search/lib/tika-core-1.5.jar:/usr/lib/flume-ng/../search/lib/tika-parsers-1.5.jar:/usr/lib/flume-ng/../search/lib/tika-xmp-1.5.jar:/usr/lib/flume-ng/../search/lib/ua-parser-1.3.0.jar:/usr/lib/flume-ng/../search/lib/vorbis-java-core-0.1.jar:/usr/lib/flume-ng/../search/lib/vorbis-java-core-0.1-tests.jar:/usr/lib/flume-ng/../search/lib/vorbis-java-tika-0.1.jar:/usr/lib/flume-ng/../search/lib/wstx-asl-3.2.7.jar:/usr/lib/flume-ng/../search/lib/xercesImpl-2.9.1.jar:/usr/lib/flume-ng/../search/lib/xml-apis-1.3.04.jar:/usr/lib/flume-ng/../search/lib/xmlbeans-2.6.0.jar:/usr/lib/flume-ng/../search/lib/xmlenc-0.52.jar:/usr/lib/flume-ng/../search/lib/xmpcore-5.1.2.jar:/usr/lib/flume-ng/../search/lib/xz-1.0.jar:/usr/lib/flume-ng/../search/lib/zookeeper.jar' -Djava.library.path=:/usr/lib/hadoop/lib/native:/usr/lib/hadoop/lib/native:/usr/lib/hbase/bin/../lib/native/Linux-amd64-64 org.apache.flume.node.Application -conf-file conf/flumekafka.conf --name agent1
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/flume-ng/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
2018-03-16 02:06:50,988 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:62)] Configuration provider starting
2018-03-16 02:06:51,005 (conf-file-poller-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:134)] Reloading configuration file:conf/flumekafka.conf
2018-03-16 02:06:51,025 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:hdfs-sink
2018-03-16 02:06:51,029 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:hdfs-sink
2018-03-16 02:06:51,029 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:hdfs-sink
2018-03-16 02:06:51,029 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:930)] Added sinks: hdfs-sink Agent: agent1
2018-03-16 02:06:51,029 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:hdfs-sink
2018-03-16 02:06:51,029 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:hdfs-sink
2018-03-16 02:06:51,029 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:hdfs-sink
2018-03-16 02:06:51,030 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:hdfs-sink
2018-03-16 02:06:51,095 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:140)] Post-validation flume configuration contains configuration for agents: [agent1]
2018-03-16 02:06:51,095 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:147)] Creating channels
2018-03-16 02:06:51,140 (conf-file-poller-0) [INFO - org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:42)] Creating instance of channel memory-channel type memory
2018-03-16 02:06:51,171 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:201)] Created channel memory-channel
2018-03-16 02:06:51,172 (conf-file-poller-0) [INFO - org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:41)] Creating instance of source kafka-source, type org.apache.flume.source.kafka.KafkaSource
2018-03-16 02:06:51,209 (conf-file-poller-0) [INFO - org.apache.flume.source.kafka.KafkaSource.doConfigure(KafkaSource.java:400)] Group ID was not specified. Using flume as the group id.
2018-03-16 02:06:51,347 (conf-file-poller-0) [INFO - org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:42)] Creating instance of sink: hdfs-sink, type: hdfs
2018-03-16 02:06:51,393 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:116)] Channel memory-channel connected to [kafka-source, hdfs-sink]
2018-03-16 02:06:51,436 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:137)] Starting new configuration:{ sourceRunners:{kafka-source=PollableSourceRunner: { source:org.apache.flume.source.kafka.KafkaSource{name:kafka-source,state:IDLE} counterGroup:{ name:null counters:{} } }} sinkRunners:{hdfs-sink=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@598ef578 counterGroup:{ name:null counters:{} } }} channels:{memory-channel=org.apache.flume.channel.MemoryChannel{name: memory-channel}} }
2018-03-16 02:06:51,450 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:144)] Starting Channel memory-channel
2018-03-16 02:06:51,586 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: CHANNEL, name: memory-channel: Successfully registered new MBean.
2018-03-16 02:06:51,587 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: memory-channel started
2018-03-16 02:06:51,587 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:171)] Starting Sink hdfs-sink
2018-03-16 02:06:51,589 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SINK, name: hdfs-sink: Successfully registered new MBean.
2018-03-16 02:06:51,589 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SINK, name: hdfs-sink started
2018-03-16 02:06:51,606 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:182)] Starting Source kafka-source
2018-03-16 02:06:51,607 (lifecycleSupervisor-1-2) [INFO - org.apache.flume.source.kafka.KafkaSource.doStart(KafkaSource.java:496)] Starting org.apache.flume.source.kafka.KafkaSource{name:kafka-source,state:IDLE}...
2018-03-16 02:06:51,701 (lifecycleSupervisor-1-2) [INFO - org.apache.kafka.common.config.AbstractConfig.logAll(AbstractConfig.java:171)] ConsumerConfig values:
request.timeout.ms = 40000
check.crcs = true
retry.backoff.ms = 100
ssl.truststore.password = null
ssl.keymanager.algorithm = SunX509
receive.buffer.bytes = 65536
ssl.cipher.suites = null
ssl.key.password = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.provider = null
sasl.kerberos.service.name = null
session.timeout.ms = 30000
sasl.kerberos.ticket.renew.window.factor = 0.8
bootstrap.servers = [quickstart.cloudera:9092]
client.id =
fetch.max.wait.ms = 500
fetch.min.bytes = 1
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
sasl.kerberos.kinit.cmd = /usr/bin/kinit
auto.offset.reset = latest
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
ssl.endpoint.identification.algorithm = null
max.partition.fetch.bytes = 1048576
ssl.keystore.location = null
ssl.truststore.location = null
ssl.keystore.password = null
metrics.sample.window.ms = 30000
metadata.max.age.ms = 300000
security.protocol = PLAINTEXT
auto.commit.interval.ms = 5000
ssl.protocol = TLS
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
ssl.trustmanager.algorithm = PKIX
group.id = flume
enable.auto.commit = false
metric.reporters = []
ssl.truststore.type = JKS
send.buffer.bytes = 131072
reconnect.backoff.ms = 50
metrics.num.samples = 2
ssl.keystore.type = JKS
heartbeat.interval.ms = 3000
2018-03-16 02:06:51,909 (lifecycleSupervisor-1-2) [WARN - org.apache.kafka.common.config.AbstractConfig.logUnused(AbstractConfig.java:179)] The configuration groupId = flume was supplied but isn't a known config.
2018-03-16 02:06:51,910 (lifecycleSupervisor-1-2) [WARN - org.apache.kafka.common.config.AbstractConfig.logUnused(AbstractConfig.java:179)] The configuration timeout.ms = 100 was supplied but isn't a known config.
2018-03-16 02:06:51,919 (lifecycleSupervisor-1-2) [INFO - org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:83)] Kafka version : 0.9.0-kafka-2.0.2
2018-03-16 02:06:51,920 (lifecycleSupervisor-1-2) [INFO - org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:84)] Kafka commitId : unknown
There are no errors encountered , but no messages are being written to HDFS. Could someone help. I tried almost every different combination of solution available in the forum.
Thanks in Advance
MMG