Support Questions
Find answers, ask questions, and share your expertise

I just start learning kafka and spark. I am trying to run a simple program for kafka consumer which saves data in spark DataFrame. But, I get the following error.

Solved Go to solution
Highlighted

I just start learning kafka and spark. I am trying to run a simple program for kafka consumer which saves data in spark DataFrame. But, I get the following error.

Rising Star

I get following error:

Exception in thread "main" java.lang.NoClassDefFoundError: scala/reflect/ClassManifest
at kafka.utils.Log4jController$.<init>(Log4jController.scala:29)
at kafka.utils.Log4jController$.<clinit>(Log4jController.scala)
at kafka.utils.Logging$class.$init$(Logging.scala:29)
at kafka.utils.VerifiableProperties.<init>(VerifiableProperties.scala:24)
at kafka.consumer.ConsumerConfig.<init>(ConsumerConfig.scala:78)
at net.mynet.ReadFileConsumer.main(ReadFileConsumer.java:42)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: scala.reflect.ClassManifest
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
... 15 more
Here is my dependancies:
<dependencies>
<dependency>
    <groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.2</artifactId>
<version>0.8.1.1</version>
    <scope>compile</scope>
<exclusions>
<exclusion>
<artifactId>jmxri</artifactId>
    <groupId>com.sun.jmx</groupId> 
</exclusion>
<exclusion><artifactId>jms</artifactId>
    <groupId>javax.jms</groupId></exclusion>
<exclusion><artifactId>jmxtools</artifactId>
      <groupId>com.sun.jdmk</groupId></exclusion>
</exclusions>
</dependency>
<dependency>
      <groupId>org.slf4j</groupId> 
      <artifactId>slf4j-log4j12</artifactId> 
<version>1.7.5</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.8.2.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.4.1</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>1.4.1</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.10.4</version>
</dependency>
</dependencies>
package net.mynet;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.SparkConf;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import java.util.concurrent.*;
public class ReadFileConsumer {
	public static void main(String[] args) {
	Properties config = new Properties();
    config.put("zookeeper.connect", "192.168.60.128:2181");
    config.put("group.id", "default");
    config.put("partition.assignment.strategy", "roundrobin");
    config.put("bootstrap.servers", "192.168.60.128:9092");
    config.put("zookeeper.session.timeout.ms", "400");
    config.put("zookeeper.sync.time.ms", "200");
   config.put("auto.commit.interval.ms", "1000");
    config.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  config.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    kafka.consumer.ConsumerConfig consumerConfig = new kafka.consumer.ConsumerConfig(config);
    ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put("test", 1);
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);
    List<KafkaStream<byte[], byte[]>> streamList = consumerMap.get("test");
    KafkaStream<byte[], byte[]> stream = streamList.get(0);
    ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
    while(iterator.hasNext()) {
     System.out.println(new String(iterator.next().message()));
    }
  }
  public static void processRecords(Map<String, ConsumerRecords<String, String>> records) {
	  ArrayList arr = new ArrayList();
    List<ConsumerRecord<String, String>> messages = records.get("test").records();
    if(messages != null) {
      for (ConsumerRecord<String, String> next : messages) {
        try {
   	arr.add(next.value());
          System.out.println(next.value());
        } catch (Exception e) {
          e.printStackTrace();}
      }
    SparkConf conf = new SparkConf().setAppName("Consumer").setMaster("local");
	  JavaSparkContext sc = new JavaSparkContext(conf);
      JavaRDD<String> distData = sc.parallelize(arr);
      SQLContext sqlContext = new SQLContext(sc);
      DataFrame df = sqlContext.createDataFrame(distData,null);
    } else {     System.out.println("No messages"); }
  }
}

,

1 ACCEPTED SOLUTION

Accepted Solutions

Re: I just start learning kafka and spark. I am trying to run a simple program for kafka consumer which saves data in spark DataFrame. But, I get the following error.

Rising Star

I solved that error by adding this dependency to my project.

<dependency>

<groupId>log4j</groupId>

<artifactId>log4j</artifactId>

<version>1.2.17</version>

</dependency>

View solution in original post

9 REPLIES 9
Highlighted

Re: I just start learning kafka and spark. I am trying to run a simple program for kafka consumer which saves data in spark DataFrame. But, I get the following error.

Mentor
@hoda moradi

please add HDP repositories to your pom, make sure to add dependencies for our versions of Spark and Kafka, 0.8.1 is old.

https://community.hortonworks.com/questions/626/how-do-i-resolve-maven-dependencies-for-building-a.h...

Highlighted

Re: I just start learning kafka and spark. I am trying to run a simple program for kafka consumer which saves data in spark DataFrame. But, I get the following error.

Rising Star

@Artem Ervits spark version is 1.4.1 and my kafka is 0.8.2.2 and I add the right versions. I still have the same error.

Highlighted

Re: I just start learning kafka and spark. I am trying to run a simple program for kafka consumer which saves data in spark DataFrame. But, I get the following error.

Mentor

not sure which IDE you're using but with Netbeans, you can add the base repository and then you can use find dependencies which will do a search on Maven Central. You can find all deps that way. @hoda moradi

Highlighted

Re: I just start learning kafka and spark. I am trying to run a simple program for kafka consumer which saves data in spark DataFrame. But, I get the following error.

Highlighted

Re: I just start learning kafka and spark. I am trying to run a simple program for kafka consumer which saves data in spark DataFrame. But, I get the following error.

Rising Star

Thank you for your response. I realized that I cannot even run a simple kafka consumer program. Even with out using spark. I still get this error java.lang.NoClassDefFoundError: scala/reflect/ClassManifest. Do you have any idea why? I checked all of my version are correct.

Highlighted

Re: I just start learning kafka and spark. I am trying to run a simple program for kafka consumer which saves data in spark DataFrame. But, I get the following error.

Highlighted

Re: I just start learning kafka and spark. I am trying to run a simple program for kafka consumer which saves data in spark DataFrame. But, I get the following error.

Rising Star

HDP version is the latest one 2.3.2. I use that too but, when I try to run a simple consumer program in java, it does not work. I use kaka.consumer package in my code. My producer code works fine now I want to get those data and do some analysis on them in my consumer code.

Highlighted

Re: I just start learning kafka and spark. I am trying to run a simple program for kafka consumer which saves data in spark DataFrame. But, I get the following error.

@hoda moradi I have used this in the past http://hortonworks.com/hadoop-tutorial/ingesting-processing-real-time-events-apache-storm/

It has kafka code too. Creative ideas ...

Re: I just start learning kafka and spark. I am trying to run a simple program for kafka consumer which saves data in spark DataFrame. But, I get the following error.

Rising Star

I solved that error by adding this dependency to my project.

<dependency>

<groupId>log4j</groupId>

<artifactId>log4j</artifactId>

<version>1.2.17</version>

</dependency>

View solution in original post