Support Questions
Find answers, ask questions, and share your expertise

Storm bolt is not connected with Kafka look like some error

Highlighted

Storm bolt is not connected with Kafka look like some error

Explorer

i am trying to connect my storm spout with Kafka and having error. Here is the code and next section is the error i getting when i run this topology. i am running on HDP 2.2.4.2-2 and compiling my code on windows 7 with eclipse

package com.storm;


import java.util.UUID;


import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;




public class storm123 
{


	public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException 
	{
		// create an instance of TopologyBuilder class
		TopologyBuilder builder = new TopologyBuilder();
		
		
		BrokerHosts hosts = new ZkHosts("sandbox.hortonworks.com:2181");
		SpoutConfig spoutConfig = new SpoutConfig(hosts, "testing2", "/testing2", UUID.randomUUID().toString());
		spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
		KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);		
		builder.setSpout("myKafkaSpout", kafkaSpout, 1);
		
		// set the spout class
		//builder.setSpout("LearningStormSpout", new stormspout(), 2);
		
		// set the bolt class
		builder.setBolt("LearningStormBolt", new stormbolt(), 4).shuffleGrouping ("myKafkaSpout");
		Config conf = new Config();
		conf.setDebug(true);
		
		
		// create an instance of LocalCluster class for
		// executing topology in local mode.
		LocalCluster cluster = new LocalCluster();
		
		// LearningStormTopolgy is the name of submitted topology
		cluster.submitTopology("LearningStormToplogy", conf, builder.createTopology());
		try {
			Thread.sleep(70000);
		} catch (Exception exception) {
			System.out.println("Thread interrupted exception : " + exception);
		}
		
		// kill the LearningStormTopology
		cluster.killTopology("LearningStormToplogy");
		
		// shutdown the storm test cluster
		cluster.shutdown();
	}


}

and here is the error when i run this code in end

20848 [Thread-16-__metricsorg.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsSink] INFO  backtype.storm.daemon.executor - Prepared bolt __metricsorg.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsSink:(6)
20854 [Thread-18-myKafkaSpout] ERROR backtype.storm.util - Async loop died!
java.lang.NoSuchMethodError: org.apache.zookeeper.server.quorum.flexible.QuorumMaj.<init>(Ljava/util/Map;)V
        at org.apache.curator.framework.imps.EnsembleTracker.<init>(EnsembleTracker.java:54) ~[curator-framework-3.0.0.jar:na]
        at org.apache.curator.framework.imps.CuratorFrameworkImpl.<init>(CuratorFrameworkImpl.java:156) ~[curator-framework-3.0.0.jar:na]
        at org.apache.curator.framework.CuratorFrameworkFactory$Builder.build(CuratorFrameworkFactory.java:136) ~[curator-framework-3.0.0.jar:na]
        at org.apache.curator.framework.CuratorFrameworkFactory.newClient(CuratorFrameworkFactory.java:107) ~[curator-framework-3.0.0.jar:na]
        at storm.kafka.ZkState.newCurator(ZkState.java:45) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
        at storm.kafka.ZkState.<init>(ZkState.java:61) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
        at storm.kafka.KafkaSpout.open(KafkaSpout.java:85) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.daemon.executor$fn__4949$fn__4964.invoke(executor.clj:542) ~[storm-core-0.9.3.2.2.4.2-2.jar:0.9.3.2.2.4.2-2]
        at backtype.storm.util$async_loop$fn__452.invoke(util.clj:463) ~[storm-core-0.9.3.2.2.4.2-2.jar:0.9.3.2.2.4.2-2]
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
        at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79]
20855 [Thread-18-myKafkaSpout] ERROR backtype.storm.daemon.executor -
java.lang.NoSuchMethodError: org.apache.zookeeper.server.quorum.flexible.QuorumMaj.<init>(Ljava/util/Map;)V
        at org.apache.curator.framework.imps.EnsembleTracker.<init>(EnsembleTracker.java:54) ~[curator-framework-3.0.0.jar:na]
        at org.apache.curator.framework.imps.CuratorFrameworkImpl.<init>(CuratorFrameworkImpl.java:156) ~[curator-framework-3.0.0.jar:na]
        at org.apache.curator.framework.CuratorFrameworkFactory$Builder.build(CuratorFrameworkFactory.java:136) ~[curator-framework-3.0.0.jar:na]
        at org.apache.curator.framework.CuratorFrameworkFactory.newClient(CuratorFrameworkFactory.java:107) ~[curator-framework-3.0.0.jar:na]
        at storm.kafka.ZkState.newCurator(ZkState.java:45) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
        at storm.kafka.ZkState.<init>(ZkState.java:61) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
        at storm.kafka.KafkaSpout.open(KafkaSpout.java:85) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.daemon.executor$fn__4949$fn__4964.invoke(executor.clj:542) ~[storm-core-0.9.3.2.2.4.2-2.jar:0.9.3.2.2.4.2-2]
        at backtype.storm.util$async_loop$fn__452.invoke(util.clj:463) ~[storm-core-0.9.3.2.2.4.2-2.jar:0.9.3.2.2.4.2-2]
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
        at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79]
20900 [Thread-18-myKafkaSpout] ERROR backtype.storm.util - Halting process: ("Worker died")
java.lang.RuntimeException: ("Worker died")
        at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:322) [storm-core-0.9.3.2.2.4.2-2.jar:0.9.3.2.2.4.2-2]
        at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.5.1.jar:na]
        at backtype.storm.daemon.worker$fn__5425$fn__5426.invoke(worker.clj:491) [storm-core-0.9.3.2.2.4.2-2.jar:0.9.3.2.2.4.2-2]
        at backtype.storm.daemon.executor$mk_executor_data$fn__4850$fn__4851.invoke(executor.clj:245) [storm-core-0.9.3.2.2.4.2-2.jar:0.9.3.2.2.4.2-2]
        at backtype.storm.util$async_loop$fn__452.invoke(util.clj:475) [storm-core-0.9.3.2.2.4.2-2.jar:0.9.3.2.2.4.2-2]
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
        at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79]
16 REPLIES 16
Highlighted

Re: Storm bolt is not connected with Kafka look like some error

Explorer

and here my POM

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.Testing</groupId>
  <artifactId>Testing</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  
  <repositories>
    <repository>
      <releases>
        <enabled>true</enabled>
        <updatePolicy>always</updatePolicy>
        <checksumPolicy>warn</checksumPolicy>
      </releases>
      <snapshots>
        <enabled>false</enabled>
        <updatePolicy>never</updatePolicy>
        <checksumPolicy>fail</checksumPolicy>
      </snapshots>
      <id>HDPReleases</id>
      <name>HDP Releases</name>
      <url>http://repo.hortonworks.com/content/repositories/releases/</url>
      <layout>default</layout>
    </repository>
  </repositories> 
  
  
  
 <dependencies>


 	<dependency>
 		<groupId>org.apache.hadoop</groupId>
 		<artifactId>hadoop-core</artifactId>
 		<version>1.2.0.23</version>
 	</dependency>


 	<dependency>
 		<groupId>org.apache.storm</groupId>
 		<artifactId>storm-core</artifactId>
 		<version>0.9.3.2.2.5.1-3</version>
 	</dependency>




 	<dependency>
 		<groupId>org.apache.hbase</groupId>
 		<artifactId>hbase-client</artifactId>
 		<version>0.98.0.2.1.3.7-24-hadoop2</version>
 	</dependency>


 
	<dependency>
	    <groupId>jdk.tools</groupId>
	    <artifactId>jdk.tools</artifactId>
	    <version>1.7.0_05</version>
	    <scope>system</scope>
	    <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
	</dependency>
	 
	 
	<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka_2.10</artifactId>
	<version>0.8.2.2.3.0.0-2557</version>
	<exclusions>
	                <exclusion>
	                    <groupId>org.slf4j</groupId>
	                    <artifactId>slf4j-log4j12</artifactId>
	                </exclusion>
                  <exclusion>
                       <groupId>org.apache.zookeeper</groupId>
                       <artifactId>zookeeper</artifactId>
                  </exclusion>	                
	 </exclusions>
	</dependency>








	
	
	<dependency>
	    <groupId>org.apache.storm</groupId>
	    <artifactId>storm-kafka</artifactId>
	    <version>0.9.2-incubating</version>
	</dependency>






	 
 </dependencies>
  
  
  
  
  
  
  
  
  
</project>
Highlighted

Re: Storm bolt is not connected with Kafka look like some error

Mentor
@Shahzad Aslam

firstly, your hadoop-core is wrong, replace with hadoop-client version 2.7.1.2.3.0.0-2557 or higher

storm-kafka also available from HWX replace with that version. Here are some examples

<hadoop-client.version>2.7.1.2.3.0.0-2557</hadoop-client.version> <hbase.version>1.1.1.2.3.0.0-2557</hbase.version> <kafka.version>0.8.2.2.3.0.0-2557</kafka.version> <store-core.version>0.10.0.2.3.0.0-2557</store-core.version> <storm-kafka.version>0.10.0.2.3.0.0-2557</storm-kafka.version> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka</artifactId> <version>${storm-kafka.version}</version> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>${store-core.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>${kafka.version}</version> <exclusions> <exclusion> <groupId>javax.jms</groupId> <artifactId>jms</artifactId> </exclusion> <exclusion> <groupId>com.sun.jdmk</groupId> <artifactId>jmxtools</artifactId> </exclusion> <exclusion> <groupId>com.sun.jmx</groupId> <artifactId>jmxri</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency><dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>${hbase.version}</version> <type>jar</type> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop-client.version}</version> <type>jar</type> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency>
Highlighted

Re: Storm bolt is not connected with Kafka look like some error

Mentor

sorry formatting is off

Highlighted

Re: Storm bolt is not connected with Kafka look like some error

Mentor

@Shahzad Aslam

you are also submitting to local cluster so make sure your storm-core has <scope>compile</scope> and when you submit to real Storm cluster, change scope to provided.

Highlighted

Re: Storm bolt is not connected with Kafka look like some error

Mentor

my recommendation is to try storm-starter from apache storm project, at least to get yourself comfortable with API.

Highlighted

Re: Storm bolt is not connected with Kafka look like some error

Explorer

Artem i have some experience working with topologies and architecture that is not alien for me i have developed and deployed some simple topologies the only thing alien is the kafka spout

Highlighted

Re: Storm bolt is not connected with Kafka look like some error

Explorer

i don't get it you are saying i should use hadoop-client instead of hadoop-core ?

Re: Storm bolt is not connected with Kafka look like some error

Mentor

@Shahzad Aslam yes if you're using hdfs then you need to use hadoop-client, hadoop-core was the right package for Hadoop 1.0, in Hadoop 2.0 client tools were moved to hadoop-client. Hadoop 1.2.0 is very old.

Highlighted

Re: Storm bolt is not connected with Kafka look like some error

Explorer

i did made the changes and run the code again but same error

can you please add me on skype shahzad.aslam100 and have a look on my code by screen sharing i promise i will not disturb you a lot. Actually this is an assignment from my potentical new employer and i need to make this working by end of this weekend.

Shahzad