Support Questions

Find answers, ask questions, and share your expertise

Storm-Kafka on CDH 5.11 with kerberos security enabled.

Explorer

Hi,

 

I have installed Apache Storm 1.1.0 manually on CDH 5.11 cluster. This cluster is secured with kerberos. 

I have storm sample written which ingest data from kafka topic and inserts into HDFS directory in real time. So, this sample uses storm-kafka as well as storm-hdfs. 

When I run the storm topology it gives the following error in kafka-spout. 

2017-06-18 22:29:31.297 o.a.z.ClientCnxn Thread-14-kafka-spout-executor[5 5]-SendThread(localhost:2181) [INFO] Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)

 

2017-06-18 22:29:31.571 k.c.SimpleConsumer Thread-14-kafka-spout-executor[5 5] [INFO] Reconnect due to error:
java.nio.channels.ClosedChannelException: null
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:110) ~[stormjar.jar:?]
        at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:85) [stormjar.jar:?]
        at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83) [stormjar.jar:?]
        at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:149) [stormjar.jar:?]
        at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) [stormjar.jar:?]
        at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:75) [stormjar.jar:?]
        at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:65) [stormjar.jar:?]
        at org.apache.storm.kafka.PartitionManager.<init>(PartitionManager.java:94) [stormjar.jar:?]
        at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) [stormjar.jar:?]
        at org.apache.storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:69) [stormjar.jar:?]
        at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:129) [stormjar.jar:?]
        at org.apache.storm.daemon.executor$fn__4976$fn__4991$fn__5022.invoke(executor.clj:644) [storm-core-1.1.0.jar:1.1.0]
        at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [storm-core-1.1.0.jar:1.1.0]

 

Kafka version: 2.1.1-1.2.1.1.p0.18

 

There is no storm-kafka*.jat present in - "/usr/local/storm"

But this sample was workin fine before kerberizing the cluster, even in this case.

 

 

I have tried the same example on Hortonworks and after adding the below code to set security protcol, the topology runs fine:

spoutConfig.securityProtocol = "SASL_PLAINTEXT";

After Adding above code in case of cloudera it gives error: "Symbol not found"

 

Please let me know if you nedd any other information...
Thanks in advance..

2 REPLIES 2

Super Collaborator
Storm might be including the old consumer kafka client, which doesn't support kerberos. Can you identify which version of kafka jars are being used by the storm client?


Are you also specifying the jaas.conf for the storm application?

-pd

Explorer

Hi @pdvorak,

 

Thanks for the reply..

 

Storm-kafka and storm-kafka client folder in "/usr/local/storm/external" just have README and there are no jars present in it. We have manually installed Storm using the below link

http://www.michael-noll.com/tutorials/running-multi-node-storm-cluster/

 

Storm_jaas.conf

 

StormServer {
   com.sun.security.auth.module.Krb5LoginModule required
   useKeyTab=true
   keyTab="/etc/security/keytabs/nimbus.service.keytab"
   storeKey=true
   useTicketCache=false
   principal="nimbus/mac1.cdh511ks.com@XXXX.COM";
};
StormClient {
   com.sun.security.auth.module.Krb5LoginModule required
   useKeyTab=true
   keyTab="/etc/security/keytabs/storm.headless.keytab"
   storeKey=true
   useTicketCache=false
   serviceName="nimbus"
   principal="storm@XXXX.COM";
};
Client {
   com.sun.security.auth.module.Krb5LoginModule required
   useKeyTab=true
   keyTab="/etc/security/keytabs/storm.headless.keytab"
   storeKey=true
   useTicketCache=false
   serviceName="zookeeper"
   principal="storm@XXXX.COM";
};

KafkaClient {
   com.sun.security.auth.module.Krb5LoginModule required
   useKeyTab=true
   keyTab="/etc/security/keytabs/storm.headless.keytab"
   storeKey=true
   useTicketCache=false
   serviceName="kafka"
   principal="storm@XXXX.COM";
};

Code Snippet of  Storm-kafka:

 

 BrokerHosts hosts = new ZkHosts(zkConnect);
        SpoutConfig spoutConfig = new SpoutConfig(hosts, kafkaTopic, "/"
                + kafkaTopic, UUID.randomUUID().toString());
        spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
        //spoutConfig.securityProtocol = "SASL_PLAINTEXT";
        KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

The commentd section is working correctly in HDP as it is taking securityProtocol. But for cloudera it is giving me error : "Symbol not found"

 

We are creating an Uber JAR for our sample, for which we are using below mentioned versions for Kafka and Storm JARs that are downloaded from Cloudera repo

 

<repo.id>cloudera</repo.id>
    <repo.url>https://repository.cloudera.com/artifactory/cloudera-repos</repo.url>

    <!-- Versions for Kafka/Storm jar dependencies -->
    <kafka.artifact.id>kafka_2.11</kafka.artifact.id>
    <kafka.version>0.10.0-kafka-2.1.1</kafka.version>
    <storm.version>1.1.0</storm.version>
    <storm.kafka.version>1.1.0</storm.kafka.version>
    <storm.hdfs.version>1.1.0</storm.hdfs.version>


we are getting below error:

2017-06-21 01:09:07.605 o.a.z.ClientCnxn Thread-14-kafka-spout-executor[5 5]-SendThread(localhost:2181) [WARN] Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect
java.net.ConnectException: Connection refused
        at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[?:1.8.0_45]
        at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) ~[?:1.8.0_45]
        at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361) ~[stormjar.jar:?]
        at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081) [stormjar.jar:?]
2017-06-21 01:09:07.614 o.a.c.f.s.ConnectionStateManager Thread-14-kafka-spout-executor[5 5]-EventThread [INFO] State change: CONNECTED
2017-06-21 01:09:07.714 o.a.z.ClientCnxn Thread-14-kafka-spout-executor[5 5]-SendThread(localhost:2181) [INFO] Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)

 

In Hortonworks, they have modified code in Kafka and Storm(kafka-spout) to work with Kerberos security.

Link for reference: https://community.hortonworks.com/questions/53787/connect-storm-to-secured-kafka-with-kafka-spout.ht...

 

Does Apache Storm 1.1.0 have the functionality to work with Kerberized Kafka cluster?

 

 

Please let me know if you require any more info..
Thanks in advance..

Take a Tour of the Community
Don't have an account?
Your experience may be limited. Sign in to explore more.