Created 01-21-2016 01:30 PM
i am trying to connect my storm spout with Kafka and having error. Here is the code and next section is the error i getting when i run this topology. i am running on HDP 2.2.4.2-2 and compiling my code on windows 7 with eclipse
package com.storm; import java.util.UUID; import storm.kafka.BrokerHosts; import storm.kafka.KafkaSpout; import storm.kafka.SpoutConfig; import storm.kafka.StringScheme; import storm.kafka.ZkHosts; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.spout.SchemeAsMultiScheme; import backtype.storm.topology.TopologyBuilder; public class storm123 { public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException { // create an instance of TopologyBuilder class TopologyBuilder builder = new TopologyBuilder(); BrokerHosts hosts = new ZkHosts("sandbox.hortonworks.com:2181"); SpoutConfig spoutConfig = new SpoutConfig(hosts, "testing2", "/testing2", UUID.randomUUID().toString()); spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); builder.setSpout("myKafkaSpout", kafkaSpout, 1); // set the spout class //builder.setSpout("LearningStormSpout", new stormspout(), 2); // set the bolt class builder.setBolt("LearningStormBolt", new stormbolt(), 4).shuffleGrouping ("myKafkaSpout"); Config conf = new Config(); conf.setDebug(true); // create an instance of LocalCluster class for // executing topology in local mode. LocalCluster cluster = new LocalCluster(); // LearningStormTopolgy is the name of submitted topology cluster.submitTopology("LearningStormToplogy", conf, builder.createTopology()); try { Thread.sleep(70000); } catch (Exception exception) { System.out.println("Thread interrupted exception : " + exception); } // kill the LearningStormTopology cluster.killTopology("LearningStormToplogy"); // shutdown the storm test cluster cluster.shutdown(); } }
and here is the error when i run this code in end
20848 [Thread-16-__metricsorg.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsSink] INFO backtype.storm.daemon.executor - Prepared bolt __metricsorg.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsSink:(6) 20854 [Thread-18-myKafkaSpout] ERROR backtype.storm.util - Async loop died! java.lang.NoSuchMethodError: org.apache.zookeeper.server.quorum.flexible.QuorumMaj.<init>(Ljava/util/Map;)V at org.apache.curator.framework.imps.EnsembleTracker.<init>(EnsembleTracker.java:54) ~[curator-framework-3.0.0.jar:na] at org.apache.curator.framework.imps.CuratorFrameworkImpl.<init>(CuratorFrameworkImpl.java:156) ~[curator-framework-3.0.0.jar:na] at org.apache.curator.framework.CuratorFrameworkFactory$Builder.build(CuratorFrameworkFactory.java:136) ~[curator-framework-3.0.0.jar:na] at org.apache.curator.framework.CuratorFrameworkFactory.newClient(CuratorFrameworkFactory.java:107) ~[curator-framework-3.0.0.jar:na] at storm.kafka.ZkState.newCurator(ZkState.java:45) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating] at storm.kafka.ZkState.<init>(ZkState.java:61) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating] at storm.kafka.KafkaSpout.open(KafkaSpout.java:85) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating] at backtype.storm.daemon.executor$fn__4949$fn__4964.invoke(executor.clj:542) ~[storm-core-0.9.3.2.2.4.2-2.jar:0.9.3.2.2.4.2-2] at backtype.storm.util$async_loop$fn__452.invoke(util.clj:463) ~[storm-core-0.9.3.2.2.4.2-2.jar:0.9.3.2.2.4.2-2] at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79] 20855 [Thread-18-myKafkaSpout] ERROR backtype.storm.daemon.executor - java.lang.NoSuchMethodError: org.apache.zookeeper.server.quorum.flexible.QuorumMaj.<init>(Ljava/util/Map;)V at org.apache.curator.framework.imps.EnsembleTracker.<init>(EnsembleTracker.java:54) ~[curator-framework-3.0.0.jar:na] at org.apache.curator.framework.imps.CuratorFrameworkImpl.<init>(CuratorFrameworkImpl.java:156) ~[curator-framework-3.0.0.jar:na] at org.apache.curator.framework.CuratorFrameworkFactory$Builder.build(CuratorFrameworkFactory.java:136) ~[curator-framework-3.0.0.jar:na] at org.apache.curator.framework.CuratorFrameworkFactory.newClient(CuratorFrameworkFactory.java:107) ~[curator-framework-3.0.0.jar:na] at storm.kafka.ZkState.newCurator(ZkState.java:45) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating] at storm.kafka.ZkState.<init>(ZkState.java:61) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating] at storm.kafka.KafkaSpout.open(KafkaSpout.java:85) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating] at backtype.storm.daemon.executor$fn__4949$fn__4964.invoke(executor.clj:542) ~[storm-core-0.9.3.2.2.4.2-2.jar:0.9.3.2.2.4.2-2] at backtype.storm.util$async_loop$fn__452.invoke(util.clj:463) ~[storm-core-0.9.3.2.2.4.2-2.jar:0.9.3.2.2.4.2-2] at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79] 20900 [Thread-18-myKafkaSpout] ERROR backtype.storm.util - Halting process: ("Worker died") java.lang.RuntimeException: ("Worker died") at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:322) [storm-core-0.9.3.2.2.4.2-2.jar:0.9.3.2.2.4.2-2] at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.5.1.jar:na] at backtype.storm.daemon.worker$fn__5425$fn__5426.invoke(worker.clj:491) [storm-core-0.9.3.2.2.4.2-2.jar:0.9.3.2.2.4.2-2] at backtype.storm.daemon.executor$mk_executor_data$fn__4850$fn__4851.invoke(executor.clj:245) [storm-core-0.9.3.2.2.4.2-2.jar:0.9.3.2.2.4.2-2] at backtype.storm.util$async_loop$fn__452.invoke(util.clj:475) [storm-core-0.9.3.2.2.4.2-2.jar:0.9.3.2.2.4.2-2] at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79]
Created 01-21-2016 01:31 PM
and here my POM
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.Testing</groupId> <artifactId>Testing</artifactId> <version>0.0.1-SNAPSHOT</version> <repositories> <repository> <releases> <enabled>true</enabled> <updatePolicy>always</updatePolicy> <checksumPolicy>warn</checksumPolicy> </releases> <snapshots> <enabled>false</enabled> <updatePolicy>never</updatePolicy> <checksumPolicy>fail</checksumPolicy> </snapshots> <id>HDPReleases</id> <name>HDP Releases</name> <url>http://repo.hortonworks.com/content/repositories/releases/</url> <layout>default</layout> </repository> </repositories> <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-core</artifactId> <version>1.2.0.23</version> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>0.9.3.2.2.5.1-3</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>0.98.0.2.1.3.7-24-hadoop2</version> </dependency> <dependency> <groupId>jdk.tools</groupId> <artifactId>jdk.tools</artifactId> <version>1.7.0_05</version> <scope>system</scope> <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.2.2.3.0.0-2557</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka</artifactId> <version>0.9.2-incubating</version> </dependency> </dependencies> </project>
Created 01-21-2016 02:19 PM
firstly, your hadoop-core is wrong, replace with hadoop-client version 2.7.1.2.3.0.0-2557 or higher
storm-kafka also available from HWX replace with that version. Here are some examples
<hadoop-client.version>2.7.1.2.3.0.0-2557</hadoop-client.version> <hbase.version>1.1.1.2.3.0.0-2557</hbase.version> <kafka.version>0.8.2.2.3.0.0-2557</kafka.version> <store-core.version>0.10.0.2.3.0.0-2557</store-core.version> <storm-kafka.version>0.10.0.2.3.0.0-2557</storm-kafka.version> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka</artifactId> <version>${storm-kafka.version}</version> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>${store-core.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>${kafka.version}</version> <exclusions> <exclusion> <groupId>javax.jms</groupId> <artifactId>jms</artifactId> </exclusion> <exclusion> <groupId>com.sun.jdmk</groupId> <artifactId>jmxtools</artifactId> </exclusion> <exclusion> <groupId>com.sun.jmx</groupId> <artifactId>jmxri</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency><dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>${hbase.version}</version> <type>jar</type> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop-client.version}</version> <type>jar</type> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency>Created 01-21-2016 02:19 PM
sorry formatting is off
Created 01-21-2016 02:21 PM
you are also submitting to local cluster so make sure your storm-core has <scope>compile</scope> and when you submit to real Storm cluster, change scope to provided.
Created 01-21-2016 02:22 PM
my recommendation is to try storm-starter from apache storm project, at least to get yourself comfortable with API.
Created 01-21-2016 02:36 PM
Artem i have some experience working with topologies and architecture that is not alien for me i have developed and deployed some simple topologies the only thing alien is the kafka spout
Created 01-21-2016 02:37 PM
i don't get it you are saying i should use hadoop-client instead of hadoop-core ?
Created 01-21-2016 02:38 PM
@Shahzad Aslam yes if you're using hdfs then you need to use hadoop-client, hadoop-core was the right package for Hadoop 1.0, in Hadoop 2.0 client tools were moved to hadoop-client. Hadoop 1.2.0 is very old.
Created 01-21-2016 02:56 PM
i did made the changes and run the code again but same error
can you please add me on skype shahzad.aslam100 and have a look on my code by screen sharing i promise i will not disturb you a lot. Actually this is an assignment from my potentical new employer and i need to make this working by end of this weekend.
Shahzad