Created 06-29-2016 07:08 AM
I'm trying to implement a Kafka Spout inside a Storm Topology.
After looking at various tutorials this is the code I finished with:
import java.util.UUID; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; //import storm configuration packages import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.topology.TopologyBuilder; import storm.kafka.KafkaSpout; import storm.kafka.SpoutConfig; import storm.kafka.ZkHosts; import backtype.storm.spout.SchemeAsMultiScheme; //Create main class TestTopo submit topology. public class TestTopo { public static void main(String[] args) throws Exception{ String zkHost = "sandbox.hortonworks.com:2181"; //Create Config instance for cluster configuration Config config = new Config(); config.setDebug(true); TopologyBuilder builder = new TopologyBuilder(); SpoutConfig MyKafkaConfig = new SpoutConfig(new ZkHosts(zkHost), "topic", "", UUID.randomUUID().toString()); builder.setSpout("MyKafkaSpout", new KafkaSpout(MyKafkaConfig)); builder.setBolt("consoleOutputBolt", new ConsoleBolt()) .shuffleGrouping("MyKafkaSpout"); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("TestTopo", config, builder.createTopology()); Thread.sleep(10000); //Stop the topology cluster.shutdown(); } }
The main problem I have is that there is a dependency problem with the storm.kafka library which looks like is not available with the main storm functionality. It looks like it is not included in the Sandbox. Where I can find this implementation??
Thanks!
Created 06-29-2016 07:13 AM
can you try adding this dependency in your pom.xml
<dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka</artifactId> <version>0.10.0.2.4.0.0-169</version> </dependency>
Created 06-29-2016 07:13 AM
can you try adding this dependency in your pom.xml
<dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka</artifactId> <version>0.10.0.2.4.0.0-169</version> </dependency>
Created 06-29-2016 08:25 AM
Thanks, this was a lot of help! As a confession, I have to tell I wasn't using Maven nor even had any idea of its existence.
Now I have my maven project working and it seems it was built using the appropriate dependencies (thanks for that). There is still an error shown when I try to run the topology:
[root@sandbox mytestapp]# java -cp target/mytestapp-1.0-SNAPSHOT.jar com.martinexsa.tigopoc.MyTestApp Exception in thread "main" java.lang.NoClassDefFoundError: storm/kafka/BrokerHosts at java.lang.Class.getDeclaredMethods0(Native Method) at java.lang.Class.privateGetDeclaredMethods(Class.java:2625) at java.lang.Class.getMethod0(Class.java:2866) at java.lang.Class.getMethod(Class.java:1676) at sun.launcher.LauncherHelper.getMainMethod(LauncherHelper.java:494) at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:486) Caused by: java.lang.ClassNotFoundException: storm.kafka.BrokerHosts at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 6 more
The error shows a ClassNotFoundException for the storm.kafka.BrokerHosts dependency, although I do not directly import that class. I assume it is a dependency of the storm-kafka package but I'm not sure where to look for it.
Thanks,
Created 06-29-2016 08:52 AM
seems required dependency is not available at runtime,can you try running this after building a fat jar. to build a fat jar you need to add build configuration in pom.xml and build using mvn package
<build> <finalName>mytestapp</finalName> <plugins> <!-- Set a compiler level --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <source>1.7</source> <target>1.7</target> </configuration> </plugin> <!-- Maven Assembly Plugin --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>2.4.1</version> <configuration> <!-- get all project dependencies --> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <!-- MainClass in mainfest make a executable jar --> <archive> <manifest> <mainClass>HiveJdbcClient</mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <!-- bind to the packaging phase --> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
Created 06-29-2016 01:59 PM
Thanks Rajkumar,
I tried to read more about the build element at the pom.xml and didn't understood a lot. I tried adding the build configuration you posted to my pom.xml, although that didn't work. I just modified pom.xml to add this code after the `</dependencies>` tag, then run `mvn package` and then `java -cp target/mytestapp-1.0-SNAPSHOT.jar com.mycompany.app.App`.
Is there something I'm missing? I keep receiving the same error. Here is my pom.xml:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.mycompany.app</groupId> <artifactId>mytestapp</artifactId> <packaging>jar</packaging> <version>1.0-SNAPSHOT</version> <name>mytestapp</name> <url>http://maven.apache.org</url> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>0.10.0</version> <scope>provided</scope> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka</artifactId> <version>0.10.0</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> </exclusions> </dependency> </dependencies> <!-- Build Settings --> <build> <finalName>empirix</finalName> <plugins> <!-- Set a compiler level --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <source>1.7</source> <target>1.7</target> </configuration> </plugin> <!-- Maven Assembly Plugin --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>2.4.1</version> <configuration> <!-- get all project dependencies --> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <!-- MainClass in mainfest make a executable jar --> <archive> <manifest> <mainClass>HiveJdbcClient</mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <!-- bind to the packaging phase --> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
Created 06-29-2016 05:35 PM
@Luis Valdeavellano after doing mvn package there should be a jar file with name of empirix-jar-with-dependencies.jar create inside the target folder, run this jar instead of mytestapp-1.0-SNAPSHOT.jar.
Created 06-30-2016 06:16 AM
Thanks, that made it