Support Questions

Find answers, ask questions, and share your expertise

I just start learning kafka and spark. I am trying to run a simple program for kafka consumer which saves data in spark DataFrame. But, I get the following error.

Rising Star

I get following error:

Exception in thread "main" java.lang.NoClassDefFoundError: scala/reflect/ClassManifest
at kafka.utils.Log4jController$.<init>(Log4jController.scala:29)
at kafka.utils.Log4jController$.<clinit>(Log4jController.scala)
at kafka.utils.Logging$class.$init$(Logging.scala:29)
at kafka.utils.VerifiableProperties.<init>(VerifiableProperties.scala:24)
at kafka.consumer.ConsumerConfig.<init>(ConsumerConfig.scala:78)
at net.mynet.ReadFileConsumer.main(ReadFileConsumer.java:42)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: scala.reflect.ClassManifest
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
... 15 more
Here is my dependancies:
<dependencies>
<dependency>
    <groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.2</artifactId>
<version>0.8.1.1</version>
    <scope>compile</scope>
<exclusions>
<exclusion>
<artifactId>jmxri</artifactId>
    <groupId>com.sun.jmx</groupId> 
</exclusion>
<exclusion><artifactId>jms</artifactId>
    <groupId>javax.jms</groupId></exclusion>
<exclusion><artifactId>jmxtools</artifactId>
      <groupId>com.sun.jdmk</groupId></exclusion>
</exclusions>
</dependency>
<dependency>
      <groupId>org.slf4j</groupId> 
      <artifactId>slf4j-log4j12</artifactId> 
<version>1.7.5</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.8.2.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.4.1</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>1.4.1</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.10.4</version>
</dependency>
</dependencies>
package net.mynet;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.SparkConf;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import java.util.concurrent.*;
public class ReadFileConsumer {
	public static void main(String[] args) {
	Properties config = new Properties();
    config.put("zookeeper.connect", "192.168.60.128:2181");
    config.put("group.id", "default");
    config.put("partition.assignment.strategy", "roundrobin");
    config.put("bootstrap.servers", "192.168.60.128:9092");
    config.put("zookeeper.session.timeout.ms", "400");
    config.put("zookeeper.sync.time.ms", "200");
   config.put("auto.commit.interval.ms", "1000");
    config.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  config.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    kafka.consumer.ConsumerConfig consumerConfig = new kafka.consumer.ConsumerConfig(config);
    ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put("test", 1);
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);
    List<KafkaStream<byte[], byte[]>> streamList = consumerMap.get("test");
    KafkaStream<byte[], byte[]> stream = streamList.get(0);
    ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
    while(iterator.hasNext()) {
     System.out.println(new String(iterator.next().message()));
    }
  }
  public static void processRecords(Map<String, ConsumerRecords<String, String>> records) {
	  ArrayList arr = new ArrayList();
    List<ConsumerRecord<String, String>> messages = records.get("test").records();
    if(messages != null) {
      for (ConsumerRecord<String, String> next : messages) {
        try {
   	arr.add(next.value());
          System.out.println(next.value());
        } catch (Exception e) {
          e.printStackTrace();}
      }
    SparkConf conf = new SparkConf().setAppName("Consumer").setMaster("local");
	  JavaSparkContext sc = new JavaSparkContext(conf);
      JavaRDD<String> distData = sc.parallelize(arr);
      SQLContext sqlContext = new SQLContext(sc);
      DataFrame df = sqlContext.createDataFrame(distData,null);
    } else {     System.out.println("No messages"); }
  }
}

,

1 ACCEPTED SOLUTION

Rising Star

I solved that error by adding this dependency to my project.

<dependency>

<groupId>log4j</groupId>

<artifactId>log4j</artifactId>

<version>1.2.17</version>

</dependency>

View solution in original post

9 REPLIES 9

Mentor
@hoda moradi

please add HDP repositories to your pom, make sure to add dependencies for our versions of Spark and Kafka, 0.8.1 is old.

https://community.hortonworks.com/questions/626/how-do-i-resolve-maven-dependencies-for-building-a.h...

Rising Star

@Artem Ervits spark version is 1.4.1 and my kafka is 0.8.2.2 and I add the right versions. I still have the same error.

Mentor

not sure which IDE you're using but with Netbeans, you can add the base repository and then you can use find dependencies which will do a search on Maven Central. You can find all deps that way. @hoda moradi

Rising Star

Thank you for your response. I realized that I cannot even run a simple kafka consumer program. Even with out using spark. I still get this error java.lang.NoClassDefFoundError: scala/reflect/ClassManifest. Do you have any idea why? I checked all of my version are correct.

Rising Star

HDP version is the latest one 2.3.2. I use that too but, when I try to run a simple consumer program in java, it does not work. I use kaka.consumer package in my code. My producer code works fine now I want to get those data and do some analysis on them in my consumer code.

@hoda moradi I have used this in the past http://hortonworks.com/hadoop-tutorial/ingesting-processing-real-time-events-apache-storm/

It has kafka code too. Creative ideas ...

Rising Star

I solved that error by adding this dependency to my project.

<dependency>

<groupId>log4j</groupId>

<artifactId>log4j</artifactId>

<version>1.2.17</version>

</dependency>

Take a Tour of the Community
Don't have an account?
Your experience may be limited. Sign in to explore more.