Created 02-16-2016 04:54 PM
I get following error:
Exception in thread "main" java.lang.NoClassDefFoundError: scala/reflect/ClassManifest
at kafka.utils.Log4jController$.<init>(Log4jController.scala:29)
at kafka.utils.Log4jController$.<clinit>(Log4jController.scala)
at kafka.utils.Logging$class.$init$(Logging.scala:29)
at kafka.utils.VerifiableProperties.<init>(VerifiableProperties.scala:24)
at kafka.consumer.ConsumerConfig.<init>(ConsumerConfig.scala:78)
at net.mynet.ReadFileConsumer.main(ReadFileConsumer.java:42)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: scala.reflect.ClassManifest
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
... 15 more
Here is my dependancies:
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.2</artifactId>
<version>0.8.1.1</version>
<scope>compile</scope>
<exclusions>
<exclusion>
<artifactId>jmxri</artifactId>
<groupId>com.sun.jmx</groupId>
</exclusion>
<exclusion><artifactId>jms</artifactId>
<groupId>javax.jms</groupId></exclusion>
<exclusion><artifactId>jmxtools</artifactId>
<groupId>com.sun.jdmk</groupId></exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.8.2.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>1.4.1</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.10.4</version>
</dependency>
</dependencies>package net.mynet;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.SparkConf;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import java.util.concurrent.*;
public class ReadFileConsumer {
public static void main(String[] args) {
Properties config = new Properties();
config.put("zookeeper.connect", "192.168.60.128:2181");
config.put("group.id", "default");
config.put("partition.assignment.strategy", "roundrobin");
config.put("bootstrap.servers", "192.168.60.128:9092");
config.put("zookeeper.session.timeout.ms", "400");
config.put("zookeeper.sync.time.ms", "200");
config.put("auto.commit.interval.ms", "1000");
config.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
config.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafka.consumer.ConsumerConfig consumerConfig = new kafka.consumer.ConsumerConfig(config);
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put("test", 1);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streamList = consumerMap.get("test");
KafkaStream<byte[], byte[]> stream = streamList.get(0);
ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
while(iterator.hasNext()) {
System.out.println(new String(iterator.next().message()));
}
}
public static void processRecords(Map<String, ConsumerRecords<String, String>> records) {
ArrayList arr = new ArrayList();
List<ConsumerRecord<String, String>> messages = records.get("test").records();
if(messages != null) {
for (ConsumerRecord<String, String> next : messages) {
try {
arr.add(next.value());
System.out.println(next.value());
} catch (Exception e) {
e.printStackTrace();}
}
SparkConf conf = new SparkConf().setAppName("Consumer").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> distData = sc.parallelize(arr);
SQLContext sqlContext = new SQLContext(sc);
DataFrame df = sqlContext.createDataFrame(distData,null);
} else { System.out.println("No messages"); }
}
}
,
Created 02-20-2016 11:34 PM
I solved that error by adding this dependency to my project.
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
Created 02-16-2016 04:56 PM
please add HDP repositories to your pom, make sure to add dependencies for our versions of Spark and Kafka, 0.8.1 is old.
Created 02-16-2016 05:09 PM
@Artem Ervits spark version is 1.4.1 and my kafka is 0.8.2.2 and I add the right versions. I still have the same error.
Created 02-17-2016 08:46 PM
not sure which IDE you're using but with Netbeans, you can add the base repository and then you can use find dependencies which will do a search on Maven Central. You can find all deps that way. @hoda moradi
Created 02-17-2016 04:34 PM
Created 02-17-2016 06:33 PM
Thank you for your response. I realized that I cannot even run a simple kafka consumer program. Even with out using spark. I still get this error java.lang.NoClassDefFoundError: scala/reflect/ClassManifest. Do you have any idea why? I checked all of my version are correct.
Created 02-17-2016 06:41 PM
@hoda moradi What HDP version are you using? I use this http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.3.4/bk_installing_manually_book/content/ch_insta...
Created 02-17-2016 06:54 PM
HDP version is the latest one 2.3.2. I use that too but, when I try to run a simple consumer program in java, it does not work. I use kaka.consumer package in my code. My producer code works fine now I want to get those data and do some analysis on them in my consumer code.
Created 02-17-2016 07:04 PM
@hoda moradi I have used this in the past http://hortonworks.com/hadoop-tutorial/ingesting-processing-real-time-events-apache-storm/
It has kafka code too. Creative ideas ...
Created 02-20-2016 11:34 PM
I solved that error by adding this dependency to my project.
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>