Created 02-16-2016 04:54 PM
I get following error:
Exception in thread "main" java.lang.NoClassDefFoundError: scala/reflect/ClassManifest at kafka.utils.Log4jController$.<init>(Log4jController.scala:29) at kafka.utils.Log4jController$.<clinit>(Log4jController.scala) at kafka.utils.Logging$class.$init$(Logging.scala:29) at kafka.utils.VerifiableProperties.<init>(VerifiableProperties.scala:24) at kafka.consumer.ConsumerConfig.<init>(ConsumerConfig.scala:78) at net.mynet.ReadFileConsumer.main(ReadFileConsumer.java:42) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: scala.reflect.ClassManifest at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 15 more Here is my dependancies: <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.9.2</artifactId> <version>0.8.1.1</version> <scope>compile</scope> <exclusions> <exclusion> <artifactId>jmxri</artifactId> <groupId>com.sun.jmx</groupId> </exclusion> <exclusion><artifactId>jms</artifactId> <groupId>javax.jms</groupId></exclusion> <exclusion><artifactId>jmxtools</artifactId> <groupId>com.sun.jdmk</groupId></exclusion> </exclusions> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.5</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.8.2.2</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.4.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.10</artifactId> <version>1.4.1</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.10.4</version> </dependency> </dependencies>
package net.mynet; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.SQLContext; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.SparkConf; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import java.util.concurrent.*; public class ReadFileConsumer { public static void main(String[] args) { Properties config = new Properties(); config.put("zookeeper.connect", "192.168.60.128:2181"); config.put("group.id", "default"); config.put("partition.assignment.strategy", "roundrobin"); config.put("bootstrap.servers", "192.168.60.128:9092"); config.put("zookeeper.session.timeout.ms", "400"); config.put("zookeeper.sync.time.ms", "200"); config.put("auto.commit.interval.ms", "1000"); config.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); config.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); kafka.consumer.ConsumerConfig consumerConfig = new kafka.consumer.ConsumerConfig(config); ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put("test", 1); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streamList = consumerMap.get("test"); KafkaStream<byte[], byte[]> stream = streamList.get(0); ConsumerIterator<byte[], byte[]> iterator = stream.iterator(); while(iterator.hasNext()) { System.out.println(new String(iterator.next().message())); } } public static void processRecords(Map<String, ConsumerRecords<String, String>> records) { ArrayList arr = new ArrayList(); List<ConsumerRecord<String, String>> messages = records.get("test").records(); if(messages != null) { for (ConsumerRecord<String, String> next : messages) { try { arr.add(next.value()); System.out.println(next.value()); } catch (Exception e) { e.printStackTrace();} } SparkConf conf = new SparkConf().setAppName("Consumer").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> distData = sc.parallelize(arr); SQLContext sqlContext = new SQLContext(sc); DataFrame df = sqlContext.createDataFrame(distData,null); } else { System.out.println("No messages"); } } }
,
Created 02-20-2016 11:34 PM
I solved that error by adding this dependency to my project.
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
Created 02-16-2016 04:56 PM
please add HDP repositories to your pom, make sure to add dependencies for our versions of Spark and Kafka, 0.8.1 is old.
Created 02-16-2016 05:09 PM
@Artem Ervits spark version is 1.4.1 and my kafka is 0.8.2.2 and I add the right versions. I still have the same error.
Created 02-17-2016 08:46 PM
not sure which IDE you're using but with Netbeans, you can add the base repository and then you can use find dependencies which will do a search on Maven Central. You can find all deps that way. @hoda moradi
Created 02-17-2016 04:34 PM
Created 02-17-2016 06:33 PM
Thank you for your response. I realized that I cannot even run a simple kafka consumer program. Even with out using spark. I still get this error java.lang.NoClassDefFoundError: scala/reflect/ClassManifest. Do you have any idea why? I checked all of my version are correct.
Created 02-17-2016 06:41 PM
@hoda moradi What HDP version are you using? I use this http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.3.4/bk_installing_manually_book/content/ch_insta...
Created 02-17-2016 06:54 PM
HDP version is the latest one 2.3.2. I use that too but, when I try to run a simple consumer program in java, it does not work. I use kaka.consumer package in my code. My producer code works fine now I want to get those data and do some analysis on them in my consumer code.
Created 02-17-2016 07:04 PM
@hoda moradi I have used this in the past http://hortonworks.com/hadoop-tutorial/ingesting-processing-real-time-events-apache-storm/
It has kafka code too. Creative ideas ...
Created 02-20-2016 11:34 PM
I solved that error by adding this dependency to my project.
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>