Created 06-29-2016 07:08 AM
I'm trying to implement a Kafka Spout inside a Storm Topology.
After looking at various tutorials this is the code I finished with:
import java.util.UUID;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
//import storm configuration packages
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.ZkHosts;
import backtype.storm.spout.SchemeAsMultiScheme;
//Create main class TestTopo submit topology.
public class TestTopo {
public static void main(String[] args) throws Exception{
String zkHost = "sandbox.hortonworks.com:2181";
//Create Config instance for cluster configuration
Config config = new Config();
config.setDebug(true);
TopologyBuilder builder = new TopologyBuilder();
SpoutConfig MyKafkaConfig = new SpoutConfig(new ZkHosts(zkHost), "topic", "", UUID.randomUUID().toString());
builder.setSpout("MyKafkaSpout", new KafkaSpout(MyKafkaConfig));
builder.setBolt("consoleOutputBolt", new ConsoleBolt())
.shuffleGrouping("MyKafkaSpout");
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("TestTopo", config, builder.createTopology());
Thread.sleep(10000);
//Stop the topology
cluster.shutdown();
}
}
The main problem I have is that there is a dependency problem with the storm.kafka library which looks like is not available with the main storm functionality. It looks like it is not included in the Sandbox. Where I can find this implementation??
Thanks!
Created 06-29-2016 07:13 AM
can you try adding this dependency in your pom.xml
<dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka</artifactId> <version>0.10.0.2.4.0.0-169</version> </dependency>
Created 06-29-2016 07:13 AM
can you try adding this dependency in your pom.xml
<dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka</artifactId> <version>0.10.0.2.4.0.0-169</version> </dependency>
Created 06-29-2016 08:25 AM
Thanks, this was a lot of help! As a confession, I have to tell I wasn't using Maven nor even had any idea of its existence.
Now I have my maven project working and it seems it was built using the appropriate dependencies (thanks for that). There is still an error shown when I try to run the topology:
[root@sandbox mytestapp]# java -cp target/mytestapp-1.0-SNAPSHOT.jar com.martinexsa.tigopoc.MyTestApp
Exception in thread "main" java.lang.NoClassDefFoundError: storm/kafka/BrokerHosts
at java.lang.Class.getDeclaredMethods0(Native Method)
at java.lang.Class.privateGetDeclaredMethods(Class.java:2625)
at java.lang.Class.getMethod0(Class.java:2866)
at java.lang.Class.getMethod(Class.java:1676)
at sun.launcher.LauncherHelper.getMainMethod(LauncherHelper.java:494)
at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:486)
Caused by: java.lang.ClassNotFoundException: storm.kafka.BrokerHosts
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
... 6 more
The error shows a ClassNotFoundException for the storm.kafka.BrokerHosts dependency, although I do not directly import that class. I assume it is a dependency of the storm-kafka package but I'm not sure where to look for it.
Thanks,
Created 06-29-2016 08:52 AM
seems required dependency is not available at runtime,can you try running this after building a fat jar. to build a fat jar you need to add build configuration in pom.xml and build using mvn package
<build>
<finalName>mytestapp</finalName>
<plugins>
<!-- Set a compiler level -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
<!-- Maven Assembly Plugin -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4.1</version>
<configuration>
<!-- get all project dependencies -->
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<!-- MainClass in mainfest make a executable jar -->
<archive>
<manifest>
<mainClass>HiveJdbcClient</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<!-- bind to the packaging phase -->
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
Created 06-29-2016 01:59 PM
Thanks Rajkumar,
I tried to read more about the build element at the pom.xml and didn't understood a lot. I tried adding the build configuration you posted to my pom.xml, although that didn't work. I just modified pom.xml to add this code after the `</dependencies>` tag, then run `mvn package` and then `java -cp target/mytestapp-1.0-SNAPSHOT.jar com.mycompany.app.App`.
Is there something I'm missing? I keep receiving the same error. Here is my pom.xml:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.mycompany.app</groupId>
<artifactId>mytestapp</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<name>mytestapp</name>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>0.10.0</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>0.10.0</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<!-- Build Settings -->
<build>
<finalName>empirix</finalName>
<plugins>
<!-- Set a compiler level -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
<!-- Maven Assembly Plugin -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4.1</version>
<configuration>
<!-- get all project dependencies -->
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<!-- MainClass in mainfest make a executable jar -->
<archive>
<manifest>
<mainClass>HiveJdbcClient</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<!-- bind to the packaging phase -->
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Created 06-29-2016 05:35 PM
@Luis Valdeavellano after doing mvn package there should be a jar file with name of empirix-jar-with-dependencies.jar create inside the target folder, run this jar instead of mytestapp-1.0-SNAPSHOT.jar.
Created 06-30-2016 06:16 AM
Thanks, that made it