Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Running HDP 2.5 with kerberos - storm doesn't receive kafka messages

avatar
New Contributor

We are running HDP 2.5 with kerberos. We see our messages in the kerberized kafka bus but storm spout doesn't pick it up.

1 ACCEPTED SOLUTION

avatar

@Dhiraj Sardana

This is a known issue which got fixed recently. Please change your storm-kafka dependency to the following and give it a try.

<dependency> 
 <groupId>org.apache.storm</groupId> 
 <artifactId>storm-kafka</artifactId> 
 <version>1.0.1.2.5.3.0-37</version> 
 </dependency>

View solution in original post

8 REPLIES 8

avatar

Can you please share the snippet which does the kafka spout configuration. Also please refer to Storm UI to get details about the kafka spout spawned host and the corresponding logs for the same.

avatar
New Contributor

This is spout configuration in the code:

Properties spoutProps = <strong>new</strong> Properties();
spoutProps.put("metadata.broker.list", props.getProperty(Constants.<strong><em>KAFKA_HOSTS_KEY</em></strong>));
spoutProps.put("serializer.class", "kafka.serializer.StringEncoder");
spoutProps.put("security.protocol", "SASL_PLAINTEXT");
config.put(KafkaBolt.<strong><em>KAFKA_BROKER_PROPERTIES</em></strong>, spoutProps);

could you please be more specific as to how to get details about kafka spout spawned host?

avatar
Contributor

We're getting same issue and we provided 'PLAINTEXTSASL' as security protocol in spout config.

We checked that when storm kafka code creates kafka consumer, it calls first constructor of SimpleConsumer which sets the protocol to unsecured one(PLAINTEXT) , rather than calling second constructor which takes security protocol as parameter.

For details , please refer : https://github.com/hortonworks/storm-release/blob/HDP-2.5.0.0-tag/external/storm-kafka/src/jvm/org/a...

We're analyzing for solving the same..Any help is highly appriciated.

avatar
Master Mentor

avatar
Contributor

@Artem Ervits : I am following above guidelines.But i found that issue is that storm-kafka code is setting security protocol to unsecured one even if config is set to secured one.I checked by debugging that In PartitionConnection class instance, config is secured , but it is creating consumer with default(unsecured) protocol[Added more details in my previous comment].

And it is failing while getting offset as SimpleConsumer is trying to connect by unsecured protocol.

So storm-kafka library should call another constructor of consumer which takes protocol as parameter..or if am still doing something wrong?

PS: I tried by setting protocol to secured one while debugging the code, then i am getting Jaas related error, but at least it tries to connect in secure way to kafka.

avatar
Contributor

@Larisa : We managed to fix this issue and messages are being consumed by kafka. We did following changes:

1) As Storm-kafka code(DynamicPartitionConnections class) is not supporting to communicate as PLAINTEXTSASL protocol. So for time being we modified that class in our codeto support secured protocol.

@Artem : Can you please check if this change can be added in storm-kafka library.

2) Added permission to storm service user(which you can check in storm jaas file) for kafka topic via ranger or kafka ACL command.

3) Somehow value for host in brokers metadata on zookeeper was null. So we updated that value with broker hostname.

Hopefully this will help you.

avatar
Master Mentor

avatar

@Dhiraj Sardana

This is a known issue which got fixed recently. Please change your storm-kafka dependency to the following and give it a try.

<dependency> 
 <groupId>org.apache.storm</groupId> 
 <artifactId>storm-kafka</artifactId> 
 <version>1.0.1.2.5.3.0-37</version> 
 </dependency>