Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Problem about Configuring Flume as Kafka Consumer

Re: Problem about Configuring Flume as Kafka Consumer

Super Collaborator
Thats odd that the VM is read only....Are you making the change in CM for the flume logging safety valve?

-pd

Re: Problem about Configuring Flume as Kafka Consumer

Expert Contributor
tier1.sources.source1.zookeeperConnect is deprecated,
use tier1.sources.kafkasource1.kafka.bootstrap.servers = bda03:9092, bda04:9092,bda05:9092.
Em Jay

Re: Problem about Configuring Flume as Kafka Consumer

Super Collaborator
As Manikumar noted above, the old flume agent configuration has been deprecated, you can refer to the flume kafka source here:
http://flume.apache.org/FlumeUserGuide.html

Also, you can confirm with the consumerGroupCommand that the flume agents are in an acive consumer group:

kafka-run-class kafka.admin.ConsumerGroupCommand --bootstrap-server ${HOSTNAME}:9092 --describe --group flume --new-consumer

-pd
Highlighted

Re: Problem about Configuring Flume as Kafka Consumer

New Contributor

hi All,

 

I have been trying to ingest data from kafka source to flume. I am using cloudera quickstart vm 5.13.


My flume config file is this :

 agent1.sources = kafka-source
 agent1.channels = memory-channel
 agent1.sinks = hdfs-sink

 agent1.sources.kafka-source.type = org.apache.flume.source.kafka.KafkaSource
 agent1.sources.kafka-source.kafka.bootstrap.servers = quickstart.cloudera:9092
 agent1.sources.kafka-source.kafka.topics = Airports
 agent1.sources.kafka-source.kafka.consumer.groupId = flume
 agent1.sources.kafka-source-kafka.batchSize = 1
 agent1.sources.kafka-source.channels = memory-channel
 agent1.sources.kafka-source.interceptors = i1
 agent1.sources.kafka-source.interceptors.i1.type = timestamp
 agent1.sources.kafka-source.interceptors.i1.preserveExisting = false
 agent1.sources.kafka-source.interceptors.i1.hostHeader = key
 agent1.sources.kafka-source.kafka.consumer.timeout.ms = 100
 agent1.sources.kafka-source.kafka.enable.auto.commit = true

 agent1.channels.memory-channel.type = memory
 agent1.channels.memory-channel.capacity = 10000
 agent1.channels.memory-channel.transactionCapacity = 1000
 
 agent1.sinks.hdfs-sink.type = hdfs
 agent1.sinks.hdfs-sink.hdfs.path = hdfs://quickstart.cloudera:8020/tmp/kafka/
 agent1.sinks.hdfs-sink.hdfs.rollInterval = 5
 agent1.sinks.hdfs-sink.hdfs.rollSize = 1
 agent1.sinks.hdfs-sink.hdfs.rollCount = 0
 agent1.sinks.hdfs-sink.hdfs.fileType = DataStream
 agent1.sinks.hdfs-sink.channel = memory-channel

The flume command I ran is :

./bin/flume-ng agent --conf conf -conf-file conf/flumekafka.conf --name agent1 -Dflume.root.logger=INFO,console

in /usr/lib/flume-ng location.

The kafka producer command and log is as below:

[cloudera@quickstart Desktop]$ /usr/bin/kafka-console-producer --broker-list quickstart.cloudera:9092 --topic Airports
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/KAFKA-3.0.0-1.3.0.0.p0.40/lib/kafka/libs/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/KAFKA-3.0.0-1.3.0.0.p0.40/lib/kafka/libs/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
18/03/16 02:07:09 INFO producer.ProducerConfig: ProducerConfig values:
    acks = 1
    batch.size = 16384
    bootstrap.servers = [quickstart.cloudera:9092]
    buffer.memory = 33554432
    client.id = console-producer
    compression.type = none
    connections.max.idle.ms = 540000
    enable.idempotence = false
    interceptor.classes = null
    key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
    linger.ms = 1000
    max.block.ms = 60000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 1500
    retries = 3
    retry.backoff.ms = 100
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 102400
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    transaction.timeout.ms = 60000
    transactional.id = null
    value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer

18/03/16 02:07:10 INFO utils.AppInfoParser: Kafka version : 0.11.0-kafka-3.0.0
18/03/16 02:07:10 INFO utils.AppInfoParser: Kafka commitId : unknown
>hi
>this is a new message
>

The flume command and log is as follows :

[cloudera@quickstart flume-ng]$ ./bin/flume-ng agent --conf conf -conf-file conf/flumekafka.conf --name agent1 -Dflume.root.logger=INFO,console
Info: Including Hadoop libraries found via (/usr/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/usr/bin/hbase) for HBASE access
Info: Including Hive libraries found via () for Hive access
+ exec /usr/java/jdk1.7.0_67-cloudera/bin/java -Xmx20m -Dflume.root.logger=INFO,console -cp '/usr/lib/flume-ng/conf:/usr/lib/flume-ng/lib/*:/etc/hadoop/conf:/usr/lib/hadoop/lib/*:/usr/lib/hadoop/.//*:/usr/lib/hadoop-hdfs/./:/usr/lib/hadoop-hdfs/lib/*:/usr/lib/hadoop-hdfs/.//*:/usr/lib/hadoop-yarn/lib/*:/usr/lib/hadoop-yarn/.//*:/opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/lib/*:/opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/.//*:/usr/lib/hbase/bin/../conf:/usr/java/jdk1.7.0_67-cloudera/lib/tools.jar:/usr/lib/hbase/bin/..:/usr/lib/hbase/bin/../lib/activation-1.1.jar:/usr/lib/hbase/bin/../lib/apacheds-i18n-2.0.0-M15.jar:/usr/lib/hbase/bin/../lib/apacheds-kerberos-codec-2.0.0-M15.jar:/usr/lib/hbase/bin/../lib/api-asn1-api-1.0.0-M20.jar:/usr/lib/hbase/bin/../lib/api-util-1.0.0-M20.jar:/usr/lib/hbase/bin/../lib/asm-3.2.jar:/usr/lib/hbase/bin/../lib/avro.jar:/usr/lib/hbase/bin/../lib/aws-java-sdk-bundle-1.11.134.jar:/usr/lib/hbase/bin/../lib/commons-beanutils-1.9.2.jar:/usr/lib/hbase/bin/../lib/commons-beanutils-core-1.8.0.jar:/usr/lib/hbase/bin/../lib/commons-cli-1.2.jar:/usr/lib/hbase/bin/../lib/commons-codec-1.9.jar:/usr/lib/hbase/bin/../lib/commons-collections-3.2.2.jar:/usr/lib/hbase/bin/../lib/commons-compress-1.4.1.jar:/usr/lib/hbase/bin/../lib/commons-configuration-1.6.jar:/usr/lib/hbase/bin/../lib/commons-daemon-1.0.13.jar:/usr/lib/hbase/bin/../lib/commons-digester-1.8.jar:/usr/lib/hbase/bin/../lib/commons-el-1.0.jar:/usr/lib/hbase/bin/../lib/commons-httpclient-3.1.jar:/usr/lib/hbase/bin/../lib/commons-io-2.4.jar:/usr/lib/hbase/bin/../lib/commons-lang-2.6.jar:/usr/lib/hbase/bin/../lib/commons-logging-1.2.jar:/usr/lib/hbase/bin/../lib/commons-math-2.1.jar:/usr/lib/hbase/bin/../lib/commons-math3-3.1.1.jar:/usr/lib/hbase/bin/../lib/commons-net-3.1.jar:/usr/lib/hbase/bin/../lib/core-3.1.1.jar:/usr/lib/hbase/bin/../lib/curator-client-2.7.1.jar:/usr/lib/hbase/bin/../lib/curator-framework-2.7.1.jar:/usr/lib/hbase/bin/../lib/curator-recipes-2.7.1.jar:/usr/lib/hbase/bin/../lib/disruptor-3.3.0.jar:/usr/lib/hbase/bin/../lib/findbugs-annotations-1.3.9-1.jar:/usr/lib/hbase/bin/../lib/gson-2.2.4.jar:/usr/lib/hbase/bin/../lib/guava-12.0.1.jar:/usr/lib/hbase/bin/../lib/hamcrest-core-1.3.jar:/usr/lib/hbase/bin/../lib/hbase-annotations-1.2.0-cdh5.13.0.jar:/usr/lib/hbase/bin/../lib/hbase-annotations-1.2.0-cdh5.13.0-tests.jar:/usr/lib/hbase/bin/../lib/hbase-client-1.2.0-cdh5.13.0.jar:/usr/lib/hbase/bin/../lib/hbase-common-1.2.0-cdh5.13.0.jar:/usr/lib/hbase/bin/../lib/hbase-common-1.2.0-cdh5.13.0-tests.jar:/usr/lib/hbase/bin/../lib/hbase-examples-1.2.0-cdh5.13.0.jar:/usr/lib/hbase/bin/../lib/hbase-external-blockcache-1.2.0-cdh5.13.0.jar:/usr/lib/hbase/bin/../lib/hbase-hadoop2-compat-1.2.0-cdh5.13.0.jar:/usr/lib/hbase/bin/../lib/hbase-hadoop2-compat-1.2.0-cdh5.13.0-tests.jar:/usr/lib/hbase/bin/../lib/hbase-hadoop-compat-1.2.0-cdh5.13.0.jar:/usr/lib/hbase/bin/../lib/hbase-hadoop-compat-1.2.0-cdh5.13.0-tests.jar:/usr/lib/hbase/bin/../lib/hbase-it-1.2.0-cdh5.13.0.jar:/usr/lib/hbase/bin/../lib/hbase-it-1.2.0-cdh5.13.0-tests.jar:/usr/lib/hbase/bin/../lib/hbase-prefix-tree-1.2.0-cdh5.13.0.jar:/usr/lib/hbase/bin/../lib/hbase-procedure-1.2.0-cdh5.13.0.jar:/usr/lib/hbase/bin/../lib/hbase-protocol-1.2.0-cdh5.13.0.jar:/usr/lib/hbase/bin/../lib/hbase-resource-bundle-1.2.0-cdh5.13.0.jar:/usr/lib/hbase/bin/../lib/hbase-rest-1.2.0-cdh5.13.0.jar:/usr/lib/hbase/bin/../lib/hbase-rsgroup-1.2.0-cdh5.13.0.jar:/usr/lib/hbase/bin/../lib/hbase-rsgroup-1.2.0-cdh5.13.0-tests.jar:/usr/lib/hbase/bin/../lib/hbase-server-1.2.0-cdh5.13.0.jar:/usr/lib/hbase/bin/../lib/hbase-server-1.2.0-cdh5.13.0-tests.jar:/usr/lib/hbase/bin/../lib/hbase-shell-1.2.0-cdh5.13.0.jar:/usr/lib/hbase/bin/../lib/hbase-spark-1.2.0-cdh5.13.0.jar:/usr/lib/hbase/bin/../lib/hbase-thrift-1.2.0-cdh5.13.0.jar:/usr/lib/hbase/bin/../lib/high-scale-lib-1.1.1.jar:/usr/lib/hbase/bin/../lib/hsqldb-1.8.0.10.jar:/usr/lib/hbase/bin/../lib/htrace-core-3.2.0-incubating.jar:/usr/lib/hbase/bin/../lib/htrace-core4-4.0.1-incubating.jar:/usr/lib/hbase/bin/../lib/htrace-core.jar:/usr/lib/hbase/bin/../lib/httpclient-4.2.5.jar:/usr/lib/hbase/bin/../lib/httpcore-4.2.5.jar:/usr/lib/hbase/bin/../lib/jackson-annotations-2.2.3.jar:/usr/lib/hbase/bin/../lib/jackson-core-2.2.3.jar:/usr/lib/hbase/bin/../lib/jackson-core-asl-1.8.8.jar:/usr/lib/hbase/bin/../lib/jackson-databind-2.2.3.jar:/usr/lib/hbase/bin/../lib/jackson-jaxrs-1.8.8.jar:/usr/lib/hbase/bin/../lib/jackson-mapper-asl-1.8.8.jar:/usr/lib/hbase/bin/../lib/jackson-xc-1.8.8.jar:/usr/lib/hbase/bin/../lib/jamon-runtime-2.4.1.jar:/usr/lib/hbase/bin/../lib/jasper-compiler-5.5.23.jar:/usr/lib/hbase/bin/../lib/jasper-runtime-5.5.23.jar:/usr/lib/hbase/bin/../lib/java-xmlbuilder-0.4.jar:/usr/lib/hbase/bin/../lib/jaxb-api-2.1.jar:/usr/lib/hbase/bin/../lib/jaxb-impl-2.2.3-1.jar:/usr/lib/hbase/bin/../lib/jcodings-1.0.8.jar:/usr/lib/hbase/bin/../lib/jersey-client-1.9.jar:/usr/lib/hbase/bin/../lib/jersey-core-1.9.jar:/usr/lib/hbase/bin/../lib/jersey-json-1.9.jar:/usr/lib/hbase/bin/../lib/jersey-server-1.9.jar:/usr/lib/hbase/bin/../lib/jets3t-0.9.0.jar:/usr/lib/hbase/bin/../lib/jettison-1.3.3.jar:/usr/lib/hbase/bin/../lib/jetty-6.1.26.cloudera.4.jar:/usr/lib/hbase/bin/../lib/jetty-sslengine-6.1.26.cloudera.4.jar:/usr/lib/hbase/bin/../lib/jetty-util-6.1.26.cloudera.4.jar:/usr/lib/hbase/bin/../lib/joni-2.1.2.jar:/usr/lib/hbase/bin/../lib/jruby-cloudera-1.0.0.jar:/usr/lib/hbase/bin/../lib/jsch-0.1.42.jar:/usr/lib/hbase/bin/../lib/jsp-2.1-6.1.14.jar:/usr/lib/hbase/bin/../lib/jsp-api-2.1-6.1.14.jar:/usr/lib/hbase/bin/../lib/jsp-api-2.1.jar:/usr/lib/hbase/bin/../lib/junit-4.12.jar:/usr/lib/hbase/bin/../lib/leveldbjni-all-1.8.jar:/usr/lib/hbase/bin/../lib/libthrift-0.9.3.jar:/usr/lib/hbase/bin/../lib/log4j-1.2.17.jar:/usr/lib/hbase/bin/../lib/metrics-core-2.2.0.jar:/usr/lib/hbase/bin/../lib/netty-all-4.0.23.Final.jar:/usr/lib/hbase/bin/../lib/paranamer-2.3.jar:/usr/lib/hbase/bin/../lib/protobuf-java-2.5.0.jar:/usr/lib/hbase/bin/../lib/servlet-api-2.5-6.1.14.jar:/usr/lib/hbase/bin/../lib/servlet-api-2.5.jar:/usr/lib/hbase/bin/../lib/slf4j-api-1.7.5.jar:/usr/lib/hbase/bin/../lib/slf4j-log4j12.jar:/usr/lib/hbase/bin/../lib/snappy-java-1.0.4.1.jar:/usr/lib/hbase/bin/../lib/spymemcached-2.11.6.jar:/usr/lib/hbase/bin/../lib/xmlenc-0.52.jar:/usr/lib/hbase/bin/../lib/xz-1.0.jar:/usr/lib/hbase/bin/../lib/zookeeper.jar:/etc/hadoop/conf:/usr/lib/hadoop/lib/*:/usr/lib/hadoop/.//*:/usr/lib/hadoop-hdfs/./:/usr/lib/hadoop-hdfs/lib/*:/usr/lib/hadoop-hdfs/.//*:/usr/lib/hadoop-yarn/lib/*:/usr/lib/hadoop-yarn/.//*:/opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/lib/*:/opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/.//*:/etc/hadoop/conf/usr/lib/hadoop/*:/usr/lib/hadoop/lib/*:/usr/lib/zookeeper/*:/usr/lib/zookeeper/lib/*::/conf:/lib/*:/usr/lib/flume-ng/../search/lib/antlr-2.7.7.jar:/usr/lib/flume-ng/../search/lib/antlr-runtime-3.4.jar:/usr/lib/flume-ng/../search/lib/apacheds-kerberos-codec-2.0.0-M15.jar:/usr/lib/flume-ng/../search/lib/apache-mime4j-core-0.7.2.jar:/usr/lib/flume-ng/../search/lib/apache-mime4j-dom-0.7.2.jar:/usr/lib/flume-ng/../search/lib/api-asn1-api-1.0.0-M20.jar:/usr/lib/flume-ng/../search/lib/api-util-1.0.0-M20.jar:/usr/lib/flume-ng/../search/lib/argparse4j-0.4.3.jar:/usr/lib/flume-ng/../search/lib/asm-3.2.jar:/usr/lib/flume-ng/../search/lib/asm-4.1.jar:/usr/lib/flume-ng/../search/lib/asm-commons-4.1.jar:/usr/lib/flume-ng/../search/lib/asm-debug-all-4.1.jar:/usr/lib/flume-ng/../search/lib/aspectjrt-1.6.5.jar:/usr/lib/flume-ng/../search/lib/avro.jar:/usr/lib/flume-ng/../search/lib/bcmail-jdk15-1.45.jar:/usr/lib/flume-ng/../search/lib/bcprov-jdk15-1.45.jar:/usr/lib/flume-ng/../search/lib/boilerpipe-1.1.0.jar:/usr/lib/flume-ng/../search/lib/commons-cli-1.2.jar:/usr/lib/flume-ng/../search/lib/commons-codec-1.4.jar:/usr/lib/flume-ng/../search/lib/commons-collections-3.2.2.jar:/usr/lib/flume-ng/../search/lib/commons-compress-1.4.1.jar:/usr/lib/flume-ng/../search/lib/commons-configuration-1.6.jar:/usr/lib/flume-ng/../search/lib/commons-el-1.0.jar:/usr/lib/flume-ng/../search/lib/commons-fileupload-1.3.2.jar:/usr/lib/flume-ng/../search/lib/commons-io-2.4.jar:/usr/lib/flume-ng/../search/lib/commons-lang-2.6.jar:/usr/lib/flume-ng/../search/lib/commons-logging-1.1.3.jar:/usr/lib/flume-ng/../search/lib/commons-math3-3.1.1.jar:/usr/lib/flume-ng/../search/lib/commons-net-3.1.jar:/usr/lib/flume-ng/../search/lib/concurrentlinkedhashmap-lru-1.2.jar:/usr/lib/flume-ng/../search/lib/config-1.0.2.jar:/usr/lib/flume-ng/../search/lib/curator-client-2.7.1.jar:/usr/lib/flume-ng/../search/lib/curator-framework-2.7.1.jar:/usr/lib/flume-ng/../search/lib/curator-recipes-2.7.1.jar:/usr/lib/flume-ng/../search/lib/dom4j-1.6.1.jar:/usr/lib/flume-ng/../search/lib/fastutil-6.3.jar:/usr/lib/flume-ng/../search/lib/fontbox-1.8.4.jar:/usr/lib/flume-ng/../search/lib/gson-2.2.4.jar:/usr/lib/flume-ng/../search/lib/guava-11.0.2.jar:/usr/lib/flume-ng/../search/lib/hadoop-annotations.jar:/usr/lib/flume-ng/../search/lib/hadoop-auth.jar:/usr/lib/flume-ng/../search/lib/hadoop-common.jar:/usr/lib/flume-ng/../search/lib/hadoop-hdfs.jar:/usr/lib/flume-ng/../search/lib/hppc-0.5.2.jar:/usr/lib/flume-ng/../search/lib/htrace-core-3.2.0-incubating.jar:/usr/lib/flume-ng/../search/lib/htrace-core4-4.0.1-incubating.jar:/usr/lib/flume-ng/../search/lib/httpclient-4.2.5.jar:/usr/lib/flume-ng/../search/lib/httpcore-4.2.5.jar:/usr/lib/flume-ng/../search/lib/httpmime-4.2.5.jar:/usr/lib/flume-ng/../search/lib/isoparser-1.0-RC-1.jar:/usr/lib/flume-ng/../search/lib/jackson-annotations-2.3.0.jar:/usr/lib/flume-ng/../search/lib/jackson-core-2.3.1.jar:/usr/lib/flume-ng/../search/lib/jackson-core-asl-1.8.8.jar:/usr/lib/flume-ng/../search/lib/jackson-databind-2.3.1.jar:/usr/lib/flume-ng/../search/lib/jackson-mapper-asl-1.8.8.jar:/usr/lib/flume-ng/../search/lib/javax.servlet-3.0.0.v201112011016.jar:/usr/lib/flume-ng/../search/lib/jcl-over-slf4j-1.7.5.jar:/usr/lib/flume-ng/../search/lib/jdom-1.0.jar:/usr/lib/flume-ng/../search/lib/jempbox-1.8.4.jar:/usr/lib/flume-ng/../search/lib/jersey-core-1.9.jar:/usr/lib/flume-ng/../search/lib/jersey-server-1.9.jar:/usr/lib/flume-ng/../search/lib/jetty-continuation-8.1.10.v20130312.jar:/usr/lib/flume-ng/../search/lib/jetty-deploy-8.1.10.v20130312.jar:/usr/lib/flume-ng/../search/lib/jetty-http-8.1.10.v20130312.jar:/usr/lib/flume-ng/../search/lib/jetty-io-8.1.10.v20130312.jar:/usr/lib/flume-ng/../search/lib/jetty-jmx-8.1.10.v20130312.jar:/usr/lib/flume-ng/../search/lib/jetty-security-8.1.10.v20130312.jar:/usr/lib/flume-ng/../search/lib/jetty-server-8.1.10.v20130312.jar:/usr/lib/flume-ng/../search/lib/jetty-servlet-8.1.10.v20130312.jar:/usr/lib/flume-ng/../search/lib/jetty-util-8.1.10.v20130312.jar:/usr/lib/flume-ng/../search/lib/jetty-webapp-8.1.10.v20130312.jar:/usr/lib/flume-ng/../search/lib/jetty-xml-8.1.10.v20130312.jar:/usr/lib/flume-ng/../search/lib/jhighlight-1.0.jar:/usr/lib/flume-ng/../search/lib/joda-time-1.6.jar:/usr/lib/flume-ng/../search/lib/jsch-0.1.42.jar:/usr/lib/flume-ng/../search/lib/jsr305-1.3.9.jar:/usr/lib/flume-ng/../search/lib/juniversalchardet-1.0.3.jar:/usr/lib/flume-ng/../search/lib/kite-hadoop-compatibility.jar:/usr/lib/flume-ng/../search/lib/kite-morphlines-avro.jar:/usr/lib/flume-ng/../search/lib/kite-morphlines-core.jar:/usr/lib/flume-ng/../search/lib/kite-morphlines-hadoop-core.jar:/usr/lib/flume-ng/../search/lib/kite-morphlines-hadoop-parquet-avro.jar:/usr/lib/flume-ng/../search/lib/kite-morphlines-hadoop-rcfile.jar:/usr/lib/flume-ng/../search/lib/kite-morphlines-hadoop-sequencefile.jar:/usr/lib/flume-ng/../search/lib/kite-morphlines-json.jar:/usr/lib/flume-ng/../search/lib/kite-morphlines-maxmind.jar:/usr/lib/flume-ng/../search/lib/kite-morphlines-metrics-servlets.jar:/usr/lib/flume-ng/../search/lib/kite-morphlines-saxon.jar:/usr/lib/flume-ng/../search/lib/kite-morphlines-solr-cell.jar:/usr/lib/flume-ng/../search/lib/kite-morphlines-solr-core.jar:/usr/lib/flume-ng/../search/lib/kite-morphlines-tika-core.jar:/usr/lib/flume-ng/../search/lib/kite-morphlines-tika-decompress.jar:/usr/lib/flume-ng/../search/lib/kite-morphlines-twitter.jar:/usr/lib/flume-ng/../search/lib/kite-morphlines-useragent.jar:/usr/lib/flume-ng/../search/lib/leveldbjni-all-1.8.jar:/usr/lib/flume-ng/../search/lib/log4j-1.2.17.jar:/usr/lib/flume-ng/../search/lib/lucene-analyzers-common.jar:/usr/lib/flume-ng/../search/lib/lucene-analyzers-kuromoji.jar:/usr/lib/flume-ng/../search/lib/lucene-analyzers-phonetic.jar:/usr/lib/flume-ng/../search/lib/lucene-codecs.jar:/usr/lib/flume-ng/../search/lib/lucene-core.jar:/usr/lib/flume-ng/../search/lib/lucene-expressions.jar:/usr/lib/flume-ng/../search/lib/lucene-grouping.jar:/usr/lib/flume-ng/../search/lib/lucene-highlighter.jar:/usr/lib/flume-ng/../search/lib/lucene-join.jar:/usr/lib/flume-ng/../search/lib/lucene-memory.jar:/usr/lib/flume-ng/../search/lib/lucene-misc.jar:/usr/lib/flume-ng/../search/lib/lucene-queries.jar:/usr/lib/flume-ng/../search/lib/lucene-queryparser.jar:/usr/lib/flume-ng/../search/lib/lucene-spatial.jar:/usr/lib/flume-ng/../search/lib/lucene-suggest.jar:/usr/lib/flume-ng/../search/lib/maxmind-db-1.0.0.jar:/usr/lib/flume-ng/../search/lib/metadata-extractor-2.6.2.jar:/usr/lib/flume-ng/../search/lib/metrics-core-3.0.2.jar:/usr/lib/flume-ng/../search/lib/metrics-healthchecks-3.0.2.jar:/usr/lib/flume-ng/../search/lib/metrics-json-3.0.2.jar:/usr/lib/flume-ng/../search/lib/metrics-jvm-3.0.2.jar:/usr/lib/flume-ng/../search/lib/metrics-servlets-3.0.2.jar:/usr/lib/flume-ng/../search/lib/netcdf-4.2-min.jar:/usr/lib/flume-ng/../search/lib/netty-3.10.5.Final.jar:/usr/lib/flume-ng/../search/lib/netty-all-4.0.23.Final.jar:/usr/lib/flume-ng/../search/lib/noggit-0.5.jar:/usr/lib/flume-ng/../search/lib/org.restlet-2.1.4.jar:/usr/lib/flume-ng/../search/lib/org.restlet.ext.servlet-2.1.4.jar:/usr/lib/flume-ng/../search/lib/paranamer-2.3.jar:/usr/lib/flume-ng/../search/lib/parquet-avro.jar:/usr/lib/flume-ng/../search/lib/parquet-column.jar:/usr/lib/flume-ng/../search/lib/parquet-common.jar:/usr/lib/flume-ng/../search/lib/parquet-encoding.jar:/usr/lib/flume-ng/../search/lib/parquet-format.jar:/usr/lib/flume-ng/../search/lib/parquet-hadoop.jar:/usr/lib/flume-ng/../search/lib/parquet-jackson.jar:/usr/lib/flume-ng/../search/lib/pdfbox-1.8.4.jar:/usr/lib/flume-ng/../search/lib/poi-3.10-beta2.jar:/usr/lib/flume-ng/../search/lib/poi-ooxml-3.10-beta2.jar:/usr/lib/flume-ng/../search/lib/poi-ooxml-schemas-3.10.1.jar:/usr/lib/flume-ng/../search/lib/poi-scratchpad-3.10-beta2.jar:/usr/lib/flume-ng/../search/lib/protobuf-java-2.5.0.jar:/usr/lib/flume-ng/../search/lib/rome-0.9.jar:/usr/lib/flume-ng/../search/lib/Saxon-HE-9.5.1-5.jar:/usr/lib/flume-ng/../search/lib/snakeyaml-1.10.jar:/usr/lib/flume-ng/../search/lib/snappy-java-1.0.4.1.jar:/usr/lib/flume-ng/../search/lib/solr-cell.jar:/usr/lib/flume-ng/../search/lib/solr-core.jar:/usr/lib/flume-ng/../search/lib/solr-solrj.jar:/usr/lib/flume-ng/../search/lib/spatial4j-0.4.1.jar:/usr/lib/flume-ng/../search/lib/tagsoup-1.2.1.jar:/usr/lib/flume-ng/../search/lib/tika-core-1.5.jar:/usr/lib/flume-ng/../search/lib/tika-parsers-1.5.jar:/usr/lib/flume-ng/../search/lib/tika-xmp-1.5.jar:/usr/lib/flume-ng/../search/lib/ua-parser-1.3.0.jar:/usr/lib/flume-ng/../search/lib/vorbis-java-core-0.1.jar:/usr/lib/flume-ng/../search/lib/vorbis-java-core-0.1-tests.jar:/usr/lib/flume-ng/../search/lib/vorbis-java-tika-0.1.jar:/usr/lib/flume-ng/../search/lib/wstx-asl-3.2.7.jar:/usr/lib/flume-ng/../search/lib/xercesImpl-2.9.1.jar:/usr/lib/flume-ng/../search/lib/xml-apis-1.3.04.jar:/usr/lib/flume-ng/../search/lib/xmlbeans-2.6.0.jar:/usr/lib/flume-ng/../search/lib/xmlenc-0.52.jar:/usr/lib/flume-ng/../search/lib/xmpcore-5.1.2.jar:/usr/lib/flume-ng/../search/lib/xz-1.0.jar:/usr/lib/flume-ng/../search/lib/zookeeper.jar' -Djava.library.path=:/usr/lib/hadoop/lib/native:/usr/lib/hadoop/lib/native:/usr/lib/hbase/bin/../lib/native/Linux-amd64-64 org.apache.flume.node.Application -conf-file conf/flumekafka.conf --name agent1
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/flume-ng/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
2018-03-16 02:06:50,988 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:62)] Configuration provider starting
2018-03-16 02:06:51,005 (conf-file-poller-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:134)] Reloading configuration file:conf/flumekafka.conf
2018-03-16 02:06:51,025 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:hdfs-sink
2018-03-16 02:06:51,029 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:hdfs-sink
2018-03-16 02:06:51,029 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:hdfs-sink
2018-03-16 02:06:51,029 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:930)] Added sinks: hdfs-sink Agent: agent1
2018-03-16 02:06:51,029 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:hdfs-sink
2018-03-16 02:06:51,029 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:hdfs-sink
2018-03-16 02:06:51,029 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:hdfs-sink
2018-03-16 02:06:51,030 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:hdfs-sink
2018-03-16 02:06:51,095 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:140)] Post-validation flume configuration contains configuration for agents: [agent1]
2018-03-16 02:06:51,095 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:147)] Creating channels
2018-03-16 02:06:51,140 (conf-file-poller-0) [INFO - org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:42)] Creating instance of channel memory-channel type memory
2018-03-16 02:06:51,171 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:201)] Created channel memory-channel
2018-03-16 02:06:51,172 (conf-file-poller-0) [INFO - org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:41)] Creating instance of source kafka-source, type org.apache.flume.source.kafka.KafkaSource
2018-03-16 02:06:51,209 (conf-file-poller-0) [INFO - org.apache.flume.source.kafka.KafkaSource.doConfigure(KafkaSource.java:400)] Group ID was not specified. Using flume as the group id.
2018-03-16 02:06:51,347 (conf-file-poller-0) [INFO - org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:42)] Creating instance of sink: hdfs-sink, type: hdfs
2018-03-16 02:06:51,393 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:116)] Channel memory-channel connected to [kafka-source, hdfs-sink]
2018-03-16 02:06:51,436 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:137)] Starting new configuration:{ sourceRunners:{kafka-source=PollableSourceRunner: { source:org.apache.flume.source.kafka.KafkaSource{name:kafka-source,state:IDLE} counterGroup:{ name:null counters:{} } }} sinkRunners:{hdfs-sink=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@598ef578 counterGroup:{ name:null counters:{} } }} channels:{memory-channel=org.apache.flume.channel.MemoryChannel{name: memory-channel}} }
2018-03-16 02:06:51,450 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:144)] Starting Channel memory-channel
2018-03-16 02:06:51,586 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: CHANNEL, name: memory-channel: Successfully registered new MBean.
2018-03-16 02:06:51,587 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: memory-channel started
2018-03-16 02:06:51,587 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:171)] Starting Sink hdfs-sink
2018-03-16 02:06:51,589 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SINK, name: hdfs-sink: Successfully registered new MBean.
2018-03-16 02:06:51,589 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SINK, name: hdfs-sink started
2018-03-16 02:06:51,606 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:182)] Starting Source kafka-source
2018-03-16 02:06:51,607 (lifecycleSupervisor-1-2) [INFO - org.apache.flume.source.kafka.KafkaSource.doStart(KafkaSource.java:496)] Starting org.apache.flume.source.kafka.KafkaSource{name:kafka-source,state:IDLE}...
2018-03-16 02:06:51,701 (lifecycleSupervisor-1-2) [INFO - org.apache.kafka.common.config.AbstractConfig.logAll(AbstractConfig.java:171)] ConsumerConfig values:
    request.timeout.ms = 40000
    check.crcs = true
    retry.backoff.ms = 100
    ssl.truststore.password = null
    ssl.keymanager.algorithm = SunX509
    receive.buffer.bytes = 65536
    ssl.cipher.suites = null
    ssl.key.password = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    ssl.provider = null
    sasl.kerberos.service.name = null
    session.timeout.ms = 30000
    sasl.kerberos.ticket.renew.window.factor = 0.8
    bootstrap.servers = [quickstart.cloudera:9092]
    client.id =
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    auto.offset.reset = latest
    value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
    ssl.endpoint.identification.algorithm = null
    max.partition.fetch.bytes = 1048576
    ssl.keystore.location = null
    ssl.truststore.location = null
    ssl.keystore.password = null
    metrics.sample.window.ms = 30000
    metadata.max.age.ms = 300000
    security.protocol = PLAINTEXT
    auto.commit.interval.ms = 5000
    ssl.protocol = TLS
    sasl.kerberos.min.time.before.relogin = 60000
    connections.max.idle.ms = 540000
    ssl.trustmanager.algorithm = PKIX
    group.id = flume
    enable.auto.commit = false
    metric.reporters = []
    ssl.truststore.type = JKS
    send.buffer.bytes = 131072
    reconnect.backoff.ms = 50
    metrics.num.samples = 2
    ssl.keystore.type = JKS
    heartbeat.interval.ms = 3000

2018-03-16 02:06:51,909 (lifecycleSupervisor-1-2) [WARN - org.apache.kafka.common.config.AbstractConfig.logUnused(AbstractConfig.java:179)] The configuration groupId = flume was supplied but isn't a known config.
2018-03-16 02:06:51,910 (lifecycleSupervisor-1-2) [WARN - org.apache.kafka.common.config.AbstractConfig.logUnused(AbstractConfig.java:179)] The configuration timeout.ms = 100 was supplied but isn't a known config.
2018-03-16 02:06:51,919 (lifecycleSupervisor-1-2) [INFO - org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:83)] Kafka version : 0.9.0-kafka-2.0.2
2018-03-16 02:06:51,920 (lifecycleSupervisor-1-2) [INFO - org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:84)] Kafka commitId : unknown

 

 

 

There are no errors encountered , but no messages are being written to HDFS. Could someone help. I tried almost every different combination of solution available in the forum.

Thanks in Advance
MMG