Support Questions

Find answers, ask questions, and share your expertise

Problem about Configuring Flume as Kafka Consumer

avatar
New Contributor

My Configuration is

Cloudera Manager 5.12.1

Flume 1.7.0

Kakfa 2.2.0-1.2.2.0.p0.68

 

I created topic "test" in kafka, and would like to configure flume to act as consumer to fetch data from this topic and save it to HDFS.

My flume configuration is

#################################

tier1.sources = source1
tier1.channels = channel1
tier1.sinks = sink1

tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.zookeeperConnect = bda03:2182, bda04:2182,bda05:2182
tier1.sources.source1.topic = test
tier1.sources.source1.groupId = flume
tier1.sources.source1.channels = channel1
tier1.sources.source1.interceptors = i1
tier1.sources.source1.interceptors.i1.type = timestamp
tier1.sources.source1.kafka.consumer.timeout.ms = 100

tier1.channels.channel1.type = memory
tier1.channels.channel1.capacity = 10000
tier1.channels.channel1.transactionCapacity = 1000

tier1.sinks.sink1.type = hdfs
tier1.sinks.sink1.hdfs.path = /user/flume/tmp/
tier1.sinks.sink1.hdfs.rollInterval = 5
tier1.sinks.sink1.hdfs.rollSize = 0
tier1.sinks.sink1.hdfs.rollCount = 0
tier1.sinks.sink1.hdfs.fileType = DataStream
tier1.sinks.sink1.channel = channel1

######################################

The configuration was just copied from https://www.cloudera.com/documentation/kafka/latest/topics/kafka_flume.html

 

After starting the flume agent, there was no error message in flume and kafka log file.

 

I tried to put some records to topics:

$ kafka-console-producer --broker-list bda03:9092,bda04:9092,bda05:9092 --topic test

Hello

This is new message

 

However, there was nothing put into HDFS.

Then, I tried to retreive message from topic "test" via command line (with consumer group flume)

$ kafka-console-consumer --zookeeper bda03:2182,bda04:2182,bda05:2182 --topic test --consumer-property group.id=flume

The output was:

Hello

This is new message

 

If the flume agent could fetch message from the topic, then the output of the above command should be empty.

The result indicated that the flume agent could not get any data from Kafka.

 

Can anyone help?

Thanks!

 

 

13 REPLIES 13

avatar
Thats odd that the VM is read only....Are you making the change in CM for the flume logging safety valve?

-pd

avatar
Expert Contributor
tier1.sources.source1.zookeeperConnect is deprecated,
use tier1.sources.kafkasource1.kafka.bootstrap.servers = bda03:9092, bda04:9092,bda05:9092.
Em Jay

avatar
As Manikumar noted above, the old flume agent configuration has been deprecated, you can refer to the flume kafka source here:
http://flume.apache.org/FlumeUserGuide.html

Also, you can confirm with the consumerGroupCommand that the flume agents are in an acive consumer group:

kafka-run-class kafka.admin.ConsumerGroupCommand --bootstrap-server ${HOSTNAME}:9092 --describe --group flume --new-consumer

-pd

avatar
Explorer

hi All,

 

I have been trying to ingest data from kafka source to flume. I am using cloudera quickstart vm 5.13.


My flume config file is this :

 agent1.sources = kafka-source
 agent1.channels = memory-channel
 agent1.sinks = hdfs-sink

 agent1.sources.kafka-source.type = org.apache.flume.source.kafka.KafkaSource
 agent1.sources.kafka-source.kafka.bootstrap.servers = quickstart.cloudera:9092
 agent1.sources.kafka-source.kafka.topics = Airports
 agent1.sources.kafka-source.kafka.consumer.groupId = flume
 agent1.sources.kafka-source-kafka.batchSize = 1
 agent1.sources.kafka-source.channels = memory-channel
 agent1.sources.kafka-source.interceptors = i1
 agent1.sources.kafka-source.interceptors.i1.type = timestamp
 agent1.sources.kafka-source.interceptors.i1.preserveExisting = false
 agent1.sources.kafka-source.interceptors.i1.hostHeader = key
 agent1.sources.kafka-source.kafka.consumer.timeout.ms = 100
 agent1.sources.kafka-source.kafka.enable.auto.commit = true

 agent1.channels.memory-channel.type = memory
 agent1.channels.memory-channel.capacity = 10000
 agent1.channels.memory-channel.transactionCapacity = 1000
 
 agent1.sinks.hdfs-sink.type = hdfs
 agent1.sinks.hdfs-sink.hdfs.path = hdfs://quickstart.cloudera:8020/tmp/kafka/
 agent1.sinks.hdfs-sink.hdfs.rollInterval = 5
 agent1.sinks.hdfs-sink.hdfs.rollSize = 1
 agent1.sinks.hdfs-sink.hdfs.rollCount = 0
 agent1.sinks.hdfs-sink.hdfs.fileType = DataStream
 agent1.sinks.hdfs-sink.channel = memory-channel

The flume command I ran is :

./bin/flume-ng agent --conf conf -conf-file conf/flumekafka.conf --name agent1 -Dflume.root.logger=INFO,console

in /usr/lib/flume-ng location.

The kafka producer command and log is as below:

[cloudera@quickstart Desktop]$ /usr/bin/kafka-console-producer --broker-list quickstart.cloudera:9092 --topic Airports
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/KAFKA-3.0.0-1.3.0.0.p0.40/lib/kafka/libs/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/KAFKA-3.0.0-1.3.0.0.p0.40/lib/kafka/libs/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
18/03/16 02:07:09 INFO producer.ProducerConfig: ProducerConfig values:
    acks = 1
    batch.size = 16384
    bootstrap.servers = [quickstart.cloudera:9092]
    buffer.memory = 33554432
    client.id = console-producer
    compression.type = none
    connections.max.idle.ms = 540000
    enable.idempotence = false
    interceptor.classes = null
    key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
    linger.ms = 1000
    max.block.ms = 60000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 1500
    retries = 3
    retry.backoff.ms = 100
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 102400
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    transaction.timeout.ms = 60000
    transactional.id = null
    value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer

18/03/16 02:07:10 INFO utils.AppInfoParser: Kafka version : 0.11.0-kafka-3.0.0
18/03/16 02:07:10 INFO utils.AppInfoParser: Kafka commitId : unknown
>hi
>this is a new message
>

The flume command and log is as follows :

[cloudera@quickstart flume-ng]$ ./bin/flume-ng agent --conf conf -conf-file conf/flumekafka.conf --name agent1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/usr/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/usr/bin/hbase) for HBASE access
Info: Including Hive libraries found via () for Hive access
+ exec /usr/java/jdk1.7.0_67-cloudera/bin/java -Xmx20m -Dflume.root.logger=INFO,console -cp '/usr/lib/flume-ng/conf:/usr/lib/flume-ng/lib/*:/etc/hadoop/conf:/usr/lib/hadoop/lib/*:/usr/lib/hadoop/.//*:/usr/lib/hadoop-hdfs/./:/usr/lib/hadoop-hdfs/lib/*:/usr/lib/hadoop-hdfs/.//*:/usr/lib/hadoop-yarn/lib/*:/usr/lib/hadoop-yarn/.//*:/opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/lib/*:/opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/.//*:/usr/lib/hbase/bin/../conf:/usr/java/jdk1.7.0_67-cloudera/lib/tools.jar:/usr/lib/hbase/bin/..:/usr/lib/hbase/bin/../lib/activation-1.1.jar:/usr/lib/hbase/bin/../lib/apacheds-i18n-2.0.0-M15.jar:/usr/lib/hbase/bin/../lib/apacheds-kerberos-codec-2.0.0-M15.jar:/usr/lib/hbase/bin/../lib/api-asn1-api-1.0.0-M20.jar:/usr/lib/hbase/bin/../lib/api-util-1.0.0-M20.jar:/usr/lib/hbase/bin/../lib/asm-3.2.jar:/usr/lib/hbase/bin/../lib/avro.jar:/usr/lib/hbase/bin/../lib/aws-java-sdk-bundle-1.11.134.jar:/usr/lib/hbase/bin/../lib/commons-beanutils-1.9.2.jar:/usr/lib/hbase/bin/../lib/commons-beanutils-core-1.8.0.jar:/usr/lib/hbase/bin/../lib/commons-cli-1.2.jar:/usr/lib/hbase/bin/../lib/commons-codec-1.9.jar:/usr/lib/hbase/bin/../lib/commons-collections-3.2.2.jar:/usr/lib/hbase/bin/../lib/commons-compress-1.4.1.jar:/usr/lib/hbase/bin/../lib/commons-configuration-1.6.jar:/usr/lib/hbase/bin/../lib/commons-daemon-1.0.13.jar:/usr/lib/hbase/bin/../lib/commons-digester-1.8.jar:/usr/lib/hbase/bin/../lib/commons-el-1.0.jar:/usr/lib/hbase/bin/../lib/commons-httpclient-3.1.jar:/usr/lib/hbase/bin/../lib/commons-io-2.4.jar:/usr/lib/hbase/bin/../lib/commons-lang-2.6.jar:/usr/lib/hbase/bin/../lib/commons-logging-1.2.jar:/usr/lib/hbase/bin/../lib/commons-math-2.1.jar:/usr/lib/hbase/bin/../lib/commons-math3-3.1.1.jar:/usr/lib/hbase/bin/../lib/commons-net-3.1.jar:/usr/lib/hbase/bin/../lib/core-3.1.1.jar:/usr/lib/hbase/bin/../lib/curator-client-2.7.1.jar:/usr/lib/hbase/bin/../lib/curator-framework-2.7.1.jar:/usr/lib/hbase/bin/../lib/curator-recipes-2.7.1.jar:/usr/lib/hbase/bin/../lib/disruptor-3.3.0.jar:/usr/lib/hbase/bin/../lib/findbugs-annotations-1.3.9-1.jar:/usr/lib/hbase/bin/../lib/gson-2.2.4.jar:/usr/lib/hbase/bin/../lib/guava-12.0.1.jar:/usr/lib/hbase/bin/../lib/hamcrest-core-1.3.jar:/usr/lib/hbase/bin/../lib/hbase-annotations-1.2.0-cdh5.13.0.jar:/usr/lib/hbase/bin/../lib/hbase-annotations-1.2.0-cdh5.13.0-tests.jar:/usr/lib/hbase/bin/../lib/hbase-client-1.2.0-cdh5.13.0.jar:/usr/lib/hbase/bin/../lib/hbase-common-1.2.0-cdh5.13.0.jar:/usr/lib/hbase/bin/../lib/hbase-common-1.2.0-cdh5.13.0-tests.jar:/usr/lib/hbase/bin/../lib/hbase-examples-1.2.0-cdh5.13.0.jar:/usr/lib/hbase/bin/../lib/hbase-external-blockcache-1.2.0-cdh5.13.0.jar:/usr/lib/hbase/bin/../lib/hbase-hadoop2-compat-1.2.0-cdh5.13.0.jar:/usr/lib/hbase/bin/../lib/hbase-hadoop2-compat-1.2.0-cdh5.13.0-tests.jar:/usr/lib/hbase/bin/../lib/hbase-hadoop-compat-1.2.0-cdh5.13.0.jar:/usr/lib/hbase/bin/../lib/hbase-hadoop-compat-1.2.0-cdh5.13.0-tests.jar:/usr/lib/hbase/bin/../lib/hbase-it-1.2.0-cdh5.13.0.jar:/usr/lib/hbase/bin/../lib/hbase-it-1.2.0-cdh5.13.0-tests.jar:/usr/lib/hbase/bin/../lib/hbase-prefix-tree-1.2.0-cdh5.13.0.jar:/usr/lib/hbase/bin/../lib/hbase-procedure-1.2.0-cdh5.13.0.jar:/usr/lib/hbase/bin/../lib/hbase-protocol-1.2.0-cdh5.13.0.jar:/usr/lib/hbase/bin/../lib/hbase-resource-bundle-1.2.0-cdh5.13.0.jar:/usr/lib/hbase/bin/../lib/hbase-rest-1.2.0-cdh5.13.0.jar:/usr/lib/hbase/bin/../lib/hbase-rsgroup-1.2.0-cdh5.13.0.jar:/usr/lib/hbase/bin/../lib/hbase-rsgroup-1.2.0-cdh5.13.0-tests.jar:/usr/lib/hbase/bin/../lib/hbase-server-1.2.0-cdh5.13.0.jar:/usr/lib/hbase/bin/../lib/hbase-server-1.2.0-cdh5.13.0-tests.jar:/usr/lib/hbase/bin/../lib/hbase-shell-1.2.0-cdh5.13.0.jar:/usr/lib/hbase/bin/../lib/hbase-spark-1.2.0-cdh5.13.0.jar:/usr/lib/hbase/bin/../lib/hbase-thrift-1.2.0-cdh5.13.0.jar:/usr/lib/hbase/bin/../lib/high-scale-lib-1.1.1.jar:/usr/lib/hbase/bin/../lib/hsqldb-1.8.0.10.jar:/usr/lib/hbase/bin/../lib/htrace-core-3.2.0-incubating.jar:/usr/lib/hbase/bin/../lib/htrace-core4-4.0.1-incubating.jar:/usr/lib/hbase/bin/../lib/htrace-core.jar:/usr/lib/hbase/bin/../lib/httpclient-4.2.5.jar:/usr/lib/hbase/bin/../lib/httpcore-4.2.5.jar:/usr/lib/hbase/bin/../lib/jackson-annotations-2.2.3.jar:/usr/lib/hbase/bin/../lib/jackson-core-2.2.3.jar:/usr/lib/hbase/bin/../lib/jackson-core-asl-1.8.8.jar:/usr/lib/hbase/bin/../lib/jackson-databind-2.2.3.jar:/usr/lib/hbase/bin/../lib/jackson-jaxrs-1.8.8.jar:/usr/lib/hbase/bin/../lib/jackson-mapper-asl-1.8.8.jar:/usr/lib/hbase/bin/../lib/jackson-xc-1.8.8.jar:/usr/lib/hbase/bin/../lib/jamon-runtime-2.4.1.jar:/usr/lib/hbase/bin/../lib/jasper-compiler-5.5.23.jar:/usr/lib/hbase/bin/../lib/jasper-runtime-5.5.23.jar:/usr/lib/hbase/bin/../lib/java-xmlbuilder-0.4.jar:/usr/lib/hbase/bin/../lib/jaxb-api-2.1.jar:/usr/lib/hbase/bin/../lib/jaxb-impl-2.2.3-1.jar:/usr/lib/hbase/bin/../lib/jcodings-1.0.8.jar:/usr/lib/hbase/bin/../lib/jersey-client-1.9.jar:/usr/lib/hbase/bin/../lib/jersey-core-1.9.jar:/usr/lib/hbase/bin/../lib/jersey-json-1.9.jar:/usr/lib/hbase/bin/../lib/jersey-server-1.9.jar:/usr/lib/hbase/bin/../lib/jets3t-0.9.0.jar:/usr/lib/hbase/bin/../lib/jettison-1.3.3.jar:/usr/lib/hbase/bin/../lib/jetty-6.1.26.cloudera.4.jar:/usr/lib/hbase/bin/../lib/jetty-sslengine-6.1.26.cloudera.4.jar:/usr/lib/hbase/bin/../lib/jetty-util-6.1.26.cloudera.4.jar:/usr/lib/hbase/bin/../lib/joni-2.1.2.jar:/usr/lib/hbase/bin/../lib/jruby-cloudera-1.0.0.jar:/usr/lib/hbase/bin/../lib/jsch-0.1.42.jar:/usr/lib/hbase/bin/../lib/jsp-2.1-6.1.14.jar:/usr/lib/hbase/bin/../lib/jsp-api-2.1-6.1.14.jar:/usr/lib/hbase/bin/../lib/jsp-api-2.1.jar:/usr/lib/hbase/bin/../lib/junit-4.12.jar:/usr/lib/hbase/bin/../lib/leveldbjni-all-1.8.jar:/usr/lib/hbase/bin/../lib/libthrift-0.9.3.jar:/usr/lib/hbase/bin/../lib/log4j-1.2.17.jar:/usr/lib/hbase/bin/../lib/metrics-core-2.2.0.jar:/usr/lib/hbase/bin/../lib/netty-all-4.0.23.Final.jar:/usr/lib/hbase/bin/../lib/paranamer-2.3.jar:/usr/lib/hbase/bin/../lib/protobuf-java-2.5.0.jar:/usr/lib/hbase/bin/../lib/servlet-api-2.5-6.1.14.jar:/usr/lib/hbase/bin/../lib/servlet-api-2.5.jar:/usr/lib/hbase/bin/../lib/slf4j-api-1.7.5.jar:/usr/lib/hbase/bin/../lib/slf4j-log4j12.jar:/usr/lib/hbase/bin/../lib/snappy-java-1.0.4.1.jar:/usr/lib/hbase/bin/../lib/spymemcached-2.11.6.jar:/usr/lib/hbase/bin/../lib/xmlenc-0.52.jar:/usr/lib/hbase/bin/../lib/xz-1.0.jar:/usr/lib/hbase/bin/../lib/zookeeper.jar:/etc/hadoop/conf:/usr/lib/hadoop/lib/*:/usr/lib/hadoop/.//*:/usr/lib/hadoop-hdfs/./:/usr/lib/hadoop-hdfs/lib/*:/usr/lib/hadoop-hdfs/.//*:/usr/lib/hadoop-yarn/lib/*:/usr/lib/hadoop-yarn/.//*:/opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/lib/*:/opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/.//*:/etc/hadoop/conf/usr/lib/hadoop/*:/usr/lib/hadoop/lib/*:/usr/lib/zookeeper/*:/usr/lib/zookeeper/lib/*::/conf:/lib/*:/usr/lib/flume-ng/../search/lib/antlr-2.7.7.jar:/usr/lib/flume-ng/../search/lib/antlr-runtime-3.4.jar:/usr/lib/flume-ng/../search/lib/apacheds-kerberos-codec-2.0.0-M15.jar:/usr/lib/flume-ng/../search/lib/apache-mime4j-core-0.7.2.jar:/usr/lib/flume-ng/../search/lib/apache-mime4j-dom-0.7.2.jar:/usr/lib/flume-ng/../search/lib/api-asn1-api-1.0.0-M20.jar:/usr/lib/flume-ng/../search/lib/api-util-1.0.0-M20.jar:/usr/lib/flume-ng/../search/lib/argparse4j-0.4.3.jar:/usr/lib/flume-ng/../search/lib/asm-3.2.jar:/usr/lib/flume-ng/../search/lib/asm-4.1.jar:/usr/lib/flume-ng/../search/lib/asm-commons-4.1.jar:/usr/lib/flume-ng/../search/lib/asm-debug-all-4.1.jar:/usr/lib/flume-ng/../search/lib/aspectjrt-1.6.5.jar:/usr/lib/flume-ng/../search/lib/avro.jar:/usr/lib/flume-ng/../search/lib/bcmail-jdk15-1.45.jar:/usr/lib/flume-ng/../search/lib/bcprov-jdk15-1.45.jar:/usr/lib/flume-ng/../search/lib/boilerpipe-1.1.0.jar:/usr/lib/flume-ng/../search/lib/commons-cli-1.2.jar:/usr/lib/flume-ng/../search/lib/commons-codec-1.4.jar:/usr/lib/flume-ng/../search/lib/commons-collections-3.2.2.jar:/usr/lib/flume-ng/../search/lib/commons-compress-1.4.1.jar:/usr/lib/flume-ng/../search/lib/commons-configuration-1.6.jar:/usr/lib/flume-ng/../search/lib/commons-el-1.0.jar:/usr/lib/flume-ng/../search/lib/commons-fileupload-1.3.2.jar:/usr/lib/flume-ng/../search/lib/commons-io-2.4.jar:/usr/lib/flume-ng/../search/lib/commons-lang-2.6.jar:/usr/lib/flume-ng/../search/lib/commons-logging-1.1.3.jar:/usr/lib/flume-ng/../search/lib/commons-math3-3.1.1.jar:/usr/lib/flume-ng/../search/lib/commons-net-3.1.jar:/usr/lib/flume-ng/../search/lib/concurrentlinkedhashmap-lru-1.2.jar:/usr/lib/flume-ng/../search/lib/config-1.0.2.jar:/usr/lib/flume-ng/../search/lib/curator-client-2.7.1.jar:/usr/lib/flume-ng/../search/lib/curator-framework-2.7.1.jar:/usr/lib/flume-ng/../search/lib/curator-recipes-2.7.1.jar:/usr/lib/flume-ng/../search/lib/dom4j-1.6.1.jar:/usr/lib/flume-ng/../search/lib/fastutil-6.3.jar:/usr/lib/flume-ng/../search/lib/fontbox-1.8.4.jar:/usr/lib/flume-ng/../search/lib/gson-2.2.4.jar:/usr/lib/flume-ng/../search/lib/guava-11.0.2.jar:/usr/lib/flume-ng/../search/lib/hadoop-annotations.jar:/usr/lib/flume-ng/../search/lib/hadoop-auth.jar:/usr/lib/flume-ng/../search/lib/hadoop-common.jar:/usr/lib/flume-ng/../search/lib/hadoop-hdfs.jar:/usr/lib/flume-ng/../search/lib/hppc-0.5.2.jar:/usr/lib/flume-ng/../search/lib/htrace-core-3.2.0-incubating.jar:/usr/lib/flume-ng/../search/lib/htrace-core4-4.0.1-incubating.jar:/usr/lib/flume-ng/../search/lib/httpclient-4.2.5.jar:/usr/lib/flume-ng/../search/lib/httpcore-4.2.5.jar:/usr/lib/flume-ng/../search/lib/httpmime-4.2.5.jar:/usr/lib/flume-ng/../search/lib/isoparser-1.0-RC-1.jar:/usr/lib/flume-ng/../search/lib/jackson-annotations-2.3.0.jar:/usr/lib/flume-ng/../search/lib/jackson-core-2.3.1.jar:/usr/lib/flume-ng/../search/lib/jackson-core-asl-1.8.8.jar:/usr/lib/flume-ng/../search/lib/jackson-databind-2.3.1.jar:/usr/lib/flume-ng/../search/lib/jackson-mapper-asl-1.8.8.jar:/usr/lib/flume-ng/../search/lib/javax.servlet-3.0.0.v201112011016.jar:/usr/lib/flume-ng/../search/lib/jcl-over-slf4j-1.7.5.jar:/usr/lib/flume-ng/../search/lib/jdom-1.0.jar:/usr/lib/flume-ng/../search/lib/jempbox-1.8.4.jar:/usr/lib/flume-ng/../search/lib/jersey-core-1.9.jar:/usr/lib/flume-ng/../search/lib/jersey-server-1.9.jar:/usr/lib/flume-ng/../search/lib/jetty-continuation-8.1.10.v20130312.jar:/usr/lib/flume-ng/../search/lib/jetty-deploy-8.1.10.v20130312.jar:/usr/lib/flume-ng/../search/lib/jetty-http-8.1.10.v20130312.jar:/usr/lib/flume-ng/../search/lib/jetty-io-8.1.10.v20130312.jar:/usr/lib/flume-ng/../search/lib/jetty-jmx-8.1.10.v20130312.jar:/usr/lib/flume-ng/../search/lib/jetty-security-8.1.10.v20130312.jar:/usr/lib/flume-ng/../search/lib/jetty-server-8.1.10.v20130312.jar:/usr/lib/flume-ng/../search/lib/jetty-servlet-8.1.10.v20130312.jar:/usr/lib/flume-ng/../search/lib/jetty-util-8.1.10.v20130312.jar:/usr/lib/flume-ng/../search/lib/jetty-webapp-8.1.10.v20130312.jar:/usr/lib/flume-ng/../search/lib/jetty-xml-8.1.10.v20130312.jar:/usr/lib/flume-ng/../search/lib/jhighlight-1.0.jar:/usr/lib/flume-ng/../search/lib/joda-time-1.6.jar:/usr/lib/flume-ng/../search/lib/jsch-0.1.42.jar:/usr/lib/flume-ng/../search/lib/jsr305-1.3.9.jar:/usr/lib/flume-ng/../search/lib/juniversalchardet-1.0.3.jar:/usr/lib/flume-ng/../search/lib/kite-hadoop-compatibility.jar:/usr/lib/flume-ng/../search/lib/kite-morphlines-avro.jar:/usr/lib/flume-ng/../search/lib/kite-morphlines-core.jar:/usr/lib/flume-ng/../search/lib/kite-morphlines-hadoop-core.jar:/usr/lib/flume-ng/../search/lib/kite-morphlines-hadoop-parquet-avro.jar:/usr/lib/flume-ng/../search/lib/kite-morphlines-hadoop-rcfile.jar:/usr/lib/flume-ng/../search/lib/kite-morphlines-hadoop-sequencefile.jar:/usr/lib/flume-ng/../search/lib/kite-morphlines-json.jar:/usr/lib/flume-ng/../search/lib/kite-morphlines-maxmind.jar:/usr/lib/flume-ng/../search/lib/kite-morphlines-metrics-servlets.jar:/usr/lib/flume-ng/../search/lib/kite-morphlines-saxon.jar:/usr/lib/flume-ng/../search/lib/kite-morphlines-solr-cell.jar:/usr/lib/flume-ng/../search/lib/kite-morphlines-solr-core.jar:/usr/lib/flume-ng/../search/lib/kite-morphlines-tika-core.jar:/usr/lib/flume-ng/../search/lib/kite-morphlines-tika-decompress.jar:/usr/lib/flume-ng/../search/lib/kite-morphlines-twitter.jar:/usr/lib/flume-ng/../search/lib/kite-morphlines-useragent.jar:/usr/lib/flume-ng/../search/lib/leveldbjni-all-1.8.jar:/usr/lib/flume-ng/../search/lib/log4j-1.2.17.jar:/usr/lib/flume-ng/../search/lib/lucene-analyzers-common.jar:/usr/lib/flume-ng/../search/lib/lucene-analyzers-kuromoji.jar:/usr/lib/flume-ng/../search/lib/lucene-analyzers-phonetic.jar:/usr/lib/flume-ng/../search/lib/lucene-codecs.jar:/usr/lib/flume-ng/../search/lib/lucene-core.jar:/usr/lib/flume-ng/../search/lib/lucene-expressions.jar:/usr/lib/flume-ng/../search/lib/lucene-grouping.jar:/usr/lib/flume-ng/../search/lib/lucene-highlighter.jar:/usr/lib/flume-ng/../search/lib/lucene-join.jar:/usr/lib/flume-ng/../search/lib/lucene-memory.jar:/usr/lib/flume-ng/../search/lib/lucene-misc.jar:/usr/lib/flume-ng/../search/lib/lucene-queries.jar:/usr/lib/flume-ng/../search/lib/lucene-queryparser.jar:/usr/lib/flume-ng/../search/lib/lucene-spatial.jar:/usr/lib/flume-ng/../search/lib/lucene-suggest.jar:/usr/lib/flume-ng/../search/lib/maxmind-db-1.0.0.jar:/usr/lib/flume-ng/../search/lib/metadata-extractor-2.6.2.jar:/usr/lib/flume-ng/../search/lib/metrics-core-3.0.2.jar:/usr/lib/flume-ng/../search/lib/metrics-healthchecks-3.0.2.jar:/usr/lib/flume-ng/../search/lib/metrics-json-3.0.2.jar:/usr/lib/flume-ng/../search/lib/metrics-jvm-3.0.2.jar:/usr/lib/flume-ng/../search/lib/metrics-servlets-3.0.2.jar:/usr/lib/flume-ng/../search/lib/netcdf-4.2-min.jar:/usr/lib/flume-ng/../search/lib/netty-3.10.5.Final.jar:/usr/lib/flume-ng/../search/lib/netty-all-4.0.23.Final.jar:/usr/lib/flume-ng/../search/lib/noggit-0.5.jar:/usr/lib/flume-ng/../search/lib/org.restlet-2.1.4.jar:/usr/lib/flume-ng/../search/lib/org.restlet.ext.servlet-2.1.4.jar:/usr/lib/flume-ng/../search/lib/paranamer-2.3.jar:/usr/lib/flume-ng/../search/lib/parquet-avro.jar:/usr/lib/flume-ng/../search/lib/parquet-column.jar:/usr/lib/flume-ng/../search/lib/parquet-common.jar:/usr/lib/flume-ng/../search/lib/parquet-encoding.jar:/usr/lib/flume-ng/../search/lib/parquet-format.jar:/usr/lib/flume-ng/../search/lib/parquet-hadoop.jar:/usr/lib/flume-ng/../search/lib/parquet-jackson.jar:/usr/lib/flume-ng/../search/lib/pdfbox-1.8.4.jar:/usr/lib/flume-ng/../search/lib/poi-3.10-beta2.jar:/usr/lib/flume-ng/../search/lib/poi-ooxml-3.10-beta2.jar:/usr/lib/flume-ng/../search/lib/poi-ooxml-schemas-3.10.1.jar:/usr/lib/flume-ng/../search/lib/poi-scratchpad-3.10-beta2.jar:/usr/lib/flume-ng/../search/lib/protobuf-java-2.5.0.jar:/usr/lib/flume-ng/../search/lib/rome-0.9.jar:/usr/lib/flume-ng/../search/lib/Saxon-HE-9.5.1-5.jar:/usr/lib/flume-ng/../search/lib/snakeyaml-1.10.jar:/usr/lib/flume-ng/../search/lib/snappy-java-1.0.4.1.jar:/usr/lib/flume-ng/../search/lib/solr-cell.jar:/usr/lib/flume-ng/../search/lib/solr-core.jar:/usr/lib/flume-ng/../search/lib/solr-solrj.jar:/usr/lib/flume-ng/../search/lib/spatial4j-0.4.1.jar:/usr/lib/flume-ng/../search/lib/tagsoup-1.2.1.jar:/usr/lib/flume-ng/../search/lib/tika-core-1.5.jar:/usr/lib/flume-ng/../search/lib/tika-parsers-1.5.jar:/usr/lib/flume-ng/../search/lib/tika-xmp-1.5.jar:/usr/lib/flume-ng/../search/lib/ua-parser-1.3.0.jar:/usr/lib/flume-ng/../search/lib/vorbis-java-core-0.1.jar:/usr/lib/flume-ng/../search/lib/vorbis-java-core-0.1-tests.jar:/usr/lib/flume-ng/../search/lib/vorbis-java-tika-0.1.jar:/usr/lib/flume-ng/../search/lib/wstx-asl-3.2.7.jar:/usr/lib/flume-ng/../search/lib/xercesImpl-2.9.1.jar:/usr/lib/flume-ng/../search/lib/xml-apis-1.3.04.jar:/usr/lib/flume-ng/../search/lib/xmlbeans-2.6.0.jar:/usr/lib/flume-ng/../search/lib/xmlenc-0.52.jar:/usr/lib/flume-ng/../search/lib/xmpcore-5.1.2.jar:/usr/lib/flume-ng/../search/lib/xz-1.0.jar:/usr/lib/flume-ng/../search/lib/zookeeper.jar' -Djava.library.path=:/usr/lib/hadoop/lib/native:/usr/lib/hadoop/lib/native:/usr/lib/hbase/bin/../lib/native/Linux-amd64-64 org.apache.flume.node.Application -conf-file conf/flumekafka.conf --name agent1
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/flume-ng/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
2018-03-16 02:06:50,988 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:62)] Configuration provider starting
2018-03-16 02:06:51,005 (conf-file-poller-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:134)] Reloading configuration file:conf/flumekafka.conf
2018-03-16 02:06:51,025 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:hdfs-sink
2018-03-16 02:06:51,029 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:hdfs-sink
2018-03-16 02:06:51,029 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:hdfs-sink
2018-03-16 02:06:51,029 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:930)] Added sinks: hdfs-sink Agent: agent1
2018-03-16 02:06:51,029 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:hdfs-sink
2018-03-16 02:06:51,029 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:hdfs-sink
2018-03-16 02:06:51,029 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:hdfs-sink
2018-03-16 02:06:51,030 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:hdfs-sink
2018-03-16 02:06:51,095 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:140)] Post-validation flume configuration contains configuration for agents: [agent1]
2018-03-16 02:06:51,095 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:147)] Creating channels
2018-03-16 02:06:51,140 (conf-file-poller-0) [INFO - org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:42)] Creating instance of channel memory-channel type memory
2018-03-16 02:06:51,171 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:201)] Created channel memory-channel
2018-03-16 02:06:51,172 (conf-file-poller-0) [INFO - org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:41)] Creating instance of source kafka-source, type org.apache.flume.source.kafka.KafkaSource
2018-03-16 02:06:51,209 (conf-file-poller-0) [INFO - org.apache.flume.source.kafka.KafkaSource.doConfigure(KafkaSource.java:400)] Group ID was not specified. Using flume as the group id.
2018-03-16 02:06:51,347 (conf-file-poller-0) [INFO - org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:42)] Creating instance of sink: hdfs-sink, type: hdfs
2018-03-16 02:06:51,393 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:116)] Channel memory-channel connected to [kafka-source, hdfs-sink]
2018-03-16 02:06:51,436 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:137)] Starting new configuration:{ sourceRunners:{kafka-source=PollableSourceRunner: { source:org.apache.flume.source.kafka.KafkaSource{name:kafka-source,state:IDLE} counterGroup:{ name:null counters:{} } }} sinkRunners:{hdfs-sink=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@598ef578 counterGroup:{ name:null counters:{} } }} channels:{memory-channel=org.apache.flume.channel.MemoryChannel{name: memory-channel}} }
2018-03-16 02:06:51,450 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:144)] Starting Channel memory-channel
2018-03-16 02:06:51,586 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: CHANNEL, name: memory-channel: Successfully registered new MBean.
2018-03-16 02:06:51,587 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: memory-channel started
2018-03-16 02:06:51,587 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:171)] Starting Sink hdfs-sink
2018-03-16 02:06:51,589 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SINK, name: hdfs-sink: Successfully registered new MBean.
2018-03-16 02:06:51,589 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SINK, name: hdfs-sink started
2018-03-16 02:06:51,606 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:182)] Starting Source kafka-source
2018-03-16 02:06:51,607 (lifecycleSupervisor-1-2) [INFO - org.apache.flume.source.kafka.KafkaSource.doStart(KafkaSource.java:496)] Starting org.apache.flume.source.kafka.KafkaSource{name:kafka-source,state:IDLE}...
2018-03-16 02:06:51,701 (lifecycleSupervisor-1-2) [INFO - org.apache.kafka.common.config.AbstractConfig.logAll(AbstractConfig.java:171)] ConsumerConfig values:
    request.timeout.ms = 40000
    check.crcs = true
    retry.backoff.ms = 100
    ssl.truststore.password = null
    ssl.keymanager.algorithm = SunX509
    receive.buffer.bytes = 65536
    ssl.cipher.suites = null
    ssl.key.password = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    ssl.provider = null
    sasl.kerberos.service.name = null
    session.timeout.ms = 30000
    sasl.kerberos.ticket.renew.window.factor = 0.8
    bootstrap.servers = [quickstart.cloudera:9092]
    client.id =
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    auto.offset.reset = latest
    value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
    ssl.endpoint.identification.algorithm = null
    max.partition.fetch.bytes = 1048576
    ssl.keystore.location = null
    ssl.truststore.location = null
    ssl.keystore.password = null
    metrics.sample.window.ms = 30000
    metadata.max.age.ms = 300000
    security.protocol = PLAINTEXT
    auto.commit.interval.ms = 5000
    ssl.protocol = TLS
    sasl.kerberos.min.time.before.relogin = 60000
    connections.max.idle.ms = 540000
    ssl.trustmanager.algorithm = PKIX
    group.id = flume
    enable.auto.commit = false
    metric.reporters = []
    ssl.truststore.type = JKS
    send.buffer.bytes = 131072
    reconnect.backoff.ms = 50
    metrics.num.samples = 2
    ssl.keystore.type = JKS
    heartbeat.interval.ms = 3000

2018-03-16 02:06:51,909 (lifecycleSupervisor-1-2) [WARN - org.apache.kafka.common.config.AbstractConfig.logUnused(AbstractConfig.java:179)] The configuration groupId = flume was supplied but isn't a known config.
2018-03-16 02:06:51,910 (lifecycleSupervisor-1-2) [WARN - org.apache.kafka.common.config.AbstractConfig.logUnused(AbstractConfig.java:179)] The configuration timeout.ms = 100 was supplied but isn't a known config.
2018-03-16 02:06:51,919 (lifecycleSupervisor-1-2) [INFO - org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:83)] Kafka version : 0.9.0-kafka-2.0.2
2018-03-16 02:06:51,920 (lifecycleSupervisor-1-2) [INFO - org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:84)] Kafka commitId : unknown

 

 

 

There are no errors encountered , but no messages are being written to HDFS. Could someone help. I tried almost every different combination of solution available in the forum.

Thanks in Advance
MMG