Support Questions

Find answers, ask questions, and share your expertise

Cloudera Quickstart VM 5.8 - Kafka kafka_2.10-0.8.2.0-kafka-1.4.0.jar - IllegalArgumentException

avatar

Dear All,

 

On Mac -> Virtual Box -> Clouderq Quickstart VM 5.8

 

Installed Kafka using packages from command line (followed the Cloudera Kafka installation document i.e. yum install kafka)

 

Changed the /etc/yum.repos.d/cloudera-kafka.repo to use "baseurl=http://archive.cloudera.com/kafka/redhat/6/x86_64/kafka/1/"

 

Developed a Spark Kafka Streaming Consumer. My maven dependencies are

 

<spark.version>1.6.1</spark.version>

<scala.minor.version>2.10</scala.minor.version>

 

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.minor.version}</artifactId>
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_${scala.minor.version}</artifactId>
<version>${spark.version}</version>
</dependency>

 

 

Issue:

 

I keep getting this below error in my Spark-Kafka code ....

 

val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
streamingCtx, config.asScala.toMap, kafkaTopics.toSet)

logger.info("Count in stream is " + stream.count())

stream.foreachRDD { rdd =>
rdd.collect.foreach(x => logger.info("KAFKA_VALUE_IS " + x))
}

 

I have only 1 partition and 1 broker in my Cloudera Quickstart 5.8 VM

 

Please help me in resolving below issue.

 

Thanks,

Training My Hobby

 

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 5, quickstart.cloudera):
java.lang.IllegalArgumentException
at java.nio.Buffer.limit(Buffer.java:267)
at kafka.api.FetchResponsePartitionData$.readFrom(FetchResponse.scala:38)
at kafka.api.TopicData$$anonfun$1.apply(FetchResponse.scala:100)
at kafka.api.TopicData$$anonfun$1.apply(FetchResponse.scala:98)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.Range.foreach(Range.scala:141)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at kafka.api.TopicData$.readFrom(FetchResponse.scala:98)
at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:170)
at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:169)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.immutable.Range.foreach(Range.scala:141)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at kafka.api.FetchResponse$.readFrom(FetchResponse.scala:169)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:135)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)

1 ACCEPTED SOLUTION

avatar

Finally I am able to resolve this issue - it is more related to improper usage of kafka dependencies in my pom.xml

 

This is what I did :

1. Created a new Cloudera Quickstart VM 5.8

 

2. Installed Kafka by following the commands mentioned in https://www.cloudera.com/documentation/kafka/latest/topics/kafka_packaging.html

 

In the above HTML link read carefully on maven dependency we have to use. Also check below pom.xml where kafka_clients is excluded and added from Cloudera Repository

 

3. In my maven pom.xml I have below configuration

 

Please note below kafka_clients exclusion while defining spark-streaming-kafka

 

 

<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>

 

<properties>

<java.version>1.8</java.version>

<scala.minor.version>2.10</scala.minor.version>

<scala.complete.version>${scala.minor.version}.6</scala.complete.version>

<spark.version>1.6.1</spark.version>

</properties>

.....

 

<dependencies>

...............

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_${scala.minor.version}</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0-kafka-2.0.1</version>
</dependency>

...............

<dependencies>

 

Hope this will help others

 

Thanks,

TrainingMyHobby

 

View solution in original post

1 REPLY 1

avatar

Finally I am able to resolve this issue - it is more related to improper usage of kafka dependencies in my pom.xml

 

This is what I did :

1. Created a new Cloudera Quickstart VM 5.8

 

2. Installed Kafka by following the commands mentioned in https://www.cloudera.com/documentation/kafka/latest/topics/kafka_packaging.html

 

In the above HTML link read carefully on maven dependency we have to use. Also check below pom.xml where kafka_clients is excluded and added from Cloudera Repository

 

3. In my maven pom.xml I have below configuration

 

Please note below kafka_clients exclusion while defining spark-streaming-kafka

 

 

<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>

 

<properties>

<java.version>1.8</java.version>

<scala.minor.version>2.10</scala.minor.version>

<scala.complete.version>${scala.minor.version}.6</scala.complete.version>

<spark.version>1.6.1</spark.version>

</properties>

.....

 

<dependencies>

...............

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_${scala.minor.version}</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0-kafka-2.0.1</version>
</dependency>

...............

<dependencies>

 

Hope this will help others

 

Thanks,

TrainingMyHobby