Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

I just start learning kafka and spark. I am trying to run a simple program for kafka consumer which saves data in spark DataFrame. But, I get the following error.

avatar
Expert Contributor

I get following error:

Exception in thread "main" java.lang.NoClassDefFoundError: scala/reflect/ClassManifest
at kafka.utils.Log4jController$.<init>(Log4jController.scala:29)
at kafka.utils.Log4jController$.<clinit>(Log4jController.scala)
at kafka.utils.Logging$class.$init$(Logging.scala:29)
at kafka.utils.VerifiableProperties.<init>(VerifiableProperties.scala:24)
at kafka.consumer.ConsumerConfig.<init>(ConsumerConfig.scala:78)
at net.mynet.ReadFileConsumer.main(ReadFileConsumer.java:42)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: scala.reflect.ClassManifest
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
... 15 more
Here is my dependancies:
<dependencies>
<dependency>
    <groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.2</artifactId>
<version>0.8.1.1</version>
    <scope>compile</scope>
<exclusions>
<exclusion>
<artifactId>jmxri</artifactId>
    <groupId>com.sun.jmx</groupId> 
</exclusion>
<exclusion><artifactId>jms</artifactId>
    <groupId>javax.jms</groupId></exclusion>
<exclusion><artifactId>jmxtools</artifactId>
      <groupId>com.sun.jdmk</groupId></exclusion>
</exclusions>
</dependency>
<dependency>
      <groupId>org.slf4j</groupId> 
      <artifactId>slf4j-log4j12</artifactId> 
<version>1.7.5</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.8.2.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.4.1</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>1.4.1</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.10.4</version>
</dependency>
</dependencies>
package net.mynet;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.SparkConf;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import java.util.concurrent.*;
public class ReadFileConsumer {
	public static void main(String[] args) {
	Properties config = new Properties();
    config.put("zookeeper.connect", "192.168.60.128:2181");
    config.put("group.id", "default");
    config.put("partition.assignment.strategy", "roundrobin");
    config.put("bootstrap.servers", "192.168.60.128:9092");
    config.put("zookeeper.session.timeout.ms", "400");
    config.put("zookeeper.sync.time.ms", "200");
   config.put("auto.commit.interval.ms", "1000");
    config.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  config.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    kafka.consumer.ConsumerConfig consumerConfig = new kafka.consumer.ConsumerConfig(config);
    ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put("test", 1);
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);
    List<KafkaStream<byte[], byte[]>> streamList = consumerMap.get("test");
    KafkaStream<byte[], byte[]> stream = streamList.get(0);
    ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
    while(iterator.hasNext()) {
     System.out.println(new String(iterator.next().message()));
    }
  }
  public static void processRecords(Map<String, ConsumerRecords<String, String>> records) {
	  ArrayList arr = new ArrayList();
    List<ConsumerRecord<String, String>> messages = records.get("test").records();
    if(messages != null) {
      for (ConsumerRecord<String, String> next : messages) {
        try {
   	arr.add(next.value());
          System.out.println(next.value());
        } catch (Exception e) {
          e.printStackTrace();}
      }
    SparkConf conf = new SparkConf().setAppName("Consumer").setMaster("local");
	  JavaSparkContext sc = new JavaSparkContext(conf);
      JavaRDD<String> distData = sc.parallelize(arr);
      SQLContext sqlContext = new SQLContext(sc);
      DataFrame df = sqlContext.createDataFrame(distData,null);
    } else {     System.out.println("No messages"); }
  }
}

,

1 ACCEPTED SOLUTION

avatar
Expert Contributor

I solved that error by adding this dependency to my project.

<dependency>

<groupId>log4j</groupId>

<artifactId>log4j</artifactId>

<version>1.2.17</version>

</dependency>

View solution in original post

9 REPLIES 9

avatar
Master Mentor
@hoda moradi

please add HDP repositories to your pom, make sure to add dependencies for our versions of Spark and Kafka, 0.8.1 is old.

https://community.hortonworks.com/questions/626/how-do-i-resolve-maven-dependencies-for-building-a.h...

avatar
Expert Contributor

@Artem Ervits spark version is 1.4.1 and my kafka is 0.8.2.2 and I add the right versions. I still have the same error.

avatar
Master Mentor

not sure which IDE you're using but with Netbeans, you can add the base repository and then you can use find dependencies which will do a search on Maven Central. You can find all deps that way. @hoda moradi

avatar
Master Mentor

avatar
Expert Contributor

Thank you for your response. I realized that I cannot even run a simple kafka consumer program. Even with out using spark. I still get this error java.lang.NoClassDefFoundError: scala/reflect/ClassManifest. Do you have any idea why? I checked all of my version are correct.

avatar
Master Mentor

avatar
Expert Contributor

HDP version is the latest one 2.3.2. I use that too but, when I try to run a simple consumer program in java, it does not work. I use kaka.consumer package in my code. My producer code works fine now I want to get those data and do some analysis on them in my consumer code.

avatar
Master Mentor

@hoda moradi I have used this in the past http://hortonworks.com/hadoop-tutorial/ingesting-processing-real-time-events-apache-storm/

It has kafka code too. Creative ideas ...

avatar
Expert Contributor

I solved that error by adding this dependency to my project.

<dependency>

<groupId>log4j</groupId>

<artifactId>log4j</artifactId>

<version>1.2.17</version>

</dependency>