Support Questions
Find answers, ask questions, and share your expertise

FLUME + Kafka skin in hpd 2.3.4.7

New Contributor

We would like to get help from you for one challenge in our HDP 2.3.4 Environment. Please guide us to resolve issue.

We install HDP 2.3.4 version and configured HA for master services (Namenode, Resource Manager, HIVE, Hbase, oozie), Kerberos (Active directory), Ranger and Knox for HDP services.

We tried to feed input to Kafka topics through flume application. It is failing after enable Kerberos.

We create require policy on Ranger for users. The same user is able to access Kafka producer and consumer through Kafka commands.

The same user is able to execute flume relate task expect input feed to Kafka topics task.

I am attached two text file which has flume configuration details and error message here. Please find this and help us.

We noticed that, once we enabled Kerberos, the Kafka broker details in zookeeper value is changed like below. Please confirm is it right or not.

[zk: host1.test.com(CONNECTED) 1] get /brokers/ids/1003

{"jmx_port":-1,"timestamp":"1461845457345","endpoints":["PLAINTEXTSASL://host4.test.com:6667"],"host":null,"version":2,"port":-1}

cZxid = 0x600000062

ctime = Thu Apr 28 12:11:02 UTC 2016

mZxid = 0x600000062

mtime = Thu Apr 28 12:11:02 UTC 2016

pZxid = 0x600000062

cversion = 0

dataVersion = 0

aclVersion = 0

ephemeralOwner = 0x2545cc704190001

dataLength = 138

numChildren = 0

[zk: host1.test.com(CONNECTED) 2] quit

flume conf file details:

tier1.sources = source1

tier1.channels = channel1

tier1.sinks = sink1

tier1.sources.source1.type = exec

tier1.sources.source1.command = /usr/bin/vmstat

tier1.sources.source1.channels = channel1

tier1.channels.channel1.type = memory

tier1.channels.channel1.capacity = 10000

tier1.channels.channel1.transactionCapacity = 1000

tier1.sinks.sink1.type=org.apache.flume.sink.kafka.KafkaSink

tier1.sinks.sink1.topic=testtopic

tier1.sinks.sink1.brokerList=host4.test.com:6667

tier1.sinks.sink1.channel=channel1

tier1.sinks.sink1.max.message.size=1000000

tier1.sinks.sink1.requiredAcks=1 tier1.sinks.sink1.batchSize=20000

tier1.sinks.skink1.security.protocol=PLAINTEXTSASL

tier1.sinks.skink1.serializer.class=kafka.serializer.StringEncoder

tier1.sinks.skin1.controller.socket.timeout.ms=30000

tier1.sinks.skin1.request.timeout.ms=30000

tier1.sinks.skin1.zookeeper.session.timeout.ms=60000

tier1.sinks.skin1.zookeeper.connection.timeout.ms=60000

tier1.sinks.skin1.topic.partitions=0

tier1.sinks.skin1.socket.request.max.bytes=1048576000

tier1.sinks.skin1.socket.send.buffer.bytes=1024000

tier1.sinks.skin1.socket.receive.buffer.bytes=1024000

tier1.sinks.skin1.client.id=console-consumer-75344

tier1.sinks.skin1.auto.commit.enable=false

tier1.sinks.skin1.dual.commit.enabled=false

0 REPLIES 0