Archives of Support Questions (Read Only)

This is an archived board for historical reference. Information and links may no longer be available or relevant
Announcements
This board is archived and read-only for historical reference. To ask a new question, please post a new topic on the appropriate active board.

Cloudera Quickstart VM 5.8 - Kafka kafka_2.10-0.8.2.0-kafka-1.4.0.jar - IllegalArgumentException

avatar

Dear All,

 

On Mac -> Virtual Box -> Clouderq Quickstart VM 5.8

 

Installed Kafka using packages from command line (followed the Cloudera Kafka installation document i.e. yum install kafka)

 

Changed the /etc/yum.repos.d/cloudera-kafka.repo to use "baseurl=http://archive.cloudera.com/kafka/redhat/6/x86_64/kafka/1/"

 

Developed a Spark Kafka Streaming Consumer. My maven dependencies are

 

<spark.version>1.6.1</spark.version>

<scala.minor.version>2.10</scala.minor.version>

 

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.minor.version}</artifactId>
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_${scala.minor.version}</artifactId>
<version>${spark.version}</version>
</dependency>

 

 

Issue:

 

I keep getting this below error in my Spark-Kafka code ....

 

val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
streamingCtx, config.asScala.toMap, kafkaTopics.toSet)

logger.info("Count in stream is " + stream.count())

stream.foreachRDD { rdd =>
rdd.collect.foreach(x => logger.info("KAFKA_VALUE_IS " + x))
}

 

I have only 1 partition and 1 broker in my Cloudera Quickstart 5.8 VM

 

Please help me in resolving below issue.

 

Thanks,

Training My Hobby

 

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 5, quickstart.cloudera):
java.lang.IllegalArgumentException
at java.nio.Buffer.limit(Buffer.java:267)
at kafka.api.FetchResponsePartitionData$.readFrom(FetchResponse.scala:38)
at kafka.api.TopicData$$anonfun$1.apply(FetchResponse.scala:100)
at kafka.api.TopicData$$anonfun$1.apply(FetchResponse.scala:98)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.Range.foreach(Range.scala:141)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at kafka.api.TopicData$.readFrom(FetchResponse.scala:98)
at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:170)
at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:169)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.immutable.Range.foreach(Range.scala:141)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at kafka.api.FetchResponse$.readFrom(FetchResponse.scala:169)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:135)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)

1 ACCEPTED SOLUTION

avatar

Finally I am able to resolve this issue - it is more related to improper usage of kafka dependencies in my pom.xml

 

This is what I did :

1. Created a new Cloudera Quickstart VM 5.8

 

2. Installed Kafka by following the commands mentioned in https://www.cloudera.com/documentation/kafka/latest/topics/kafka_packaging.html

 

In the above HTML link read carefully on maven dependency we have to use. Also check below pom.xml where kafka_clients is excluded and added from Cloudera Repository

 

3. In my maven pom.xml I have below configuration

 

Please note below kafka_clients exclusion while defining spark-streaming-kafka

 

 

<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>

 

<properties>

<java.version>1.8</java.version>

<scala.minor.version>2.10</scala.minor.version>

<scala.complete.version>${scala.minor.version}.6</scala.complete.version>

<spark.version>1.6.1</spark.version>

</properties>

.....

 

<dependencies>

...............

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_${scala.minor.version}</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0-kafka-2.0.1</version>
</dependency>

...............

<dependencies>

 

Hope this will help others

 

Thanks,

TrainingMyHobby

 

View solution in original post

1 REPLY 1

avatar

Finally I am able to resolve this issue - it is more related to improper usage of kafka dependencies in my pom.xml

 

This is what I did :

1. Created a new Cloudera Quickstart VM 5.8

 

2. Installed Kafka by following the commands mentioned in https://www.cloudera.com/documentation/kafka/latest/topics/kafka_packaging.html

 

In the above HTML link read carefully on maven dependency we have to use. Also check below pom.xml where kafka_clients is excluded and added from Cloudera Repository

 

3. In my maven pom.xml I have below configuration

 

Please note below kafka_clients exclusion while defining spark-streaming-kafka

 

 

<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>

 

<properties>

<java.version>1.8</java.version>

<scala.minor.version>2.10</scala.minor.version>

<scala.complete.version>${scala.minor.version}.6</scala.complete.version>

<spark.version>1.6.1</spark.version>

</properties>

.....

 

<dependencies>

...............

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_${scala.minor.version}</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0-kafka-2.0.1</version>
</dependency>

...............

<dependencies>

 

Hope this will help others

 

Thanks,

TrainingMyHobby