Created on 11-17-2016 10:38 PM - edited 09-16-2022 03:48 AM
Dear All,
On Mac -> Virtual Box -> Clouderq Quickstart VM 5.8
Installed Kafka using packages from command line (followed the Cloudera Kafka installation document i.e. yum install kafka)
Changed the /etc/yum.repos.d/cloudera-kafka.repo to use "baseurl=http://archive.cloudera.com/kafka/redhat/6/x86_64/kafka/1/"
Developed a Spark Kafka Streaming Consumer. My maven dependencies are
<spark.version>1.6.1</spark.version>
<scala.minor.version>2.10</scala.minor.version>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.minor.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_${scala.minor.version}</artifactId>
<version>${spark.version}</version>
</dependency>
Issue:
I keep getting this below error in my Spark-Kafka code ....
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
streamingCtx, config.asScala.toMap, kafkaTopics.toSet)
logger.info("Count in stream is " + stream.count())
stream.foreachRDD { rdd =>
rdd.collect.foreach(x => logger.info("KAFKA_VALUE_IS " + x))
}
I have only 1 partition and 1 broker in my Cloudera Quickstart 5.8 VM
Please help me in resolving below issue.
Thanks,
Training My Hobby
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 5, quickstart.cloudera):
java.lang.IllegalArgumentException
at java.nio.Buffer.limit(Buffer.java:267)
at kafka.api.FetchResponsePartitionData$.readFrom(FetchResponse.scala:38)
at kafka.api.TopicData$$anonfun$1.apply(FetchResponse.scala:100)
at kafka.api.TopicData$$anonfun$1.apply(FetchResponse.scala:98)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.Range.foreach(Range.scala:141)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at kafka.api.TopicData$.readFrom(FetchResponse.scala:98)
at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:170)
at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:169)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.immutable.Range.foreach(Range.scala:141)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at kafka.api.FetchResponse$.readFrom(FetchResponse.scala:169)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:135)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
Created 11-18-2016 07:50 PM
Finally I am able to resolve this issue - it is more related to improper usage of kafka dependencies in my pom.xml
This is what I did :
1. Created a new Cloudera Quickstart VM 5.8
2. Installed Kafka by following the commands mentioned in https://www.cloudera.com/documentation/kafka/latest/topics/kafka_packaging.html
In the above HTML link read carefully on maven dependency we have to use. Also check below pom.xml where kafka_clients is excluded and added from Cloudera Repository
3. In my maven pom.xml I have below configuration
Please note below kafka_clients exclusion while defining spark-streaming-kafka
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
<properties>
<java.version>1.8</java.version>
<scala.minor.version>2.10</scala.minor.version>
<scala.complete.version>${scala.minor.version}.6</scala.complete.version>
<spark.version>1.6.1</spark.version>
</properties>
.....
<dependencies>
...............
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_${scala.minor.version}</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0-kafka-2.0.1</version>
</dependency>
...............
<dependencies>
Hope this will help others
Thanks,
TrainingMyHobby
Created 11-18-2016 07:50 PM
Finally I am able to resolve this issue - it is more related to improper usage of kafka dependencies in my pom.xml
This is what I did :
1. Created a new Cloudera Quickstart VM 5.8
2. Installed Kafka by following the commands mentioned in https://www.cloudera.com/documentation/kafka/latest/topics/kafka_packaging.html
In the above HTML link read carefully on maven dependency we have to use. Also check below pom.xml where kafka_clients is excluded and added from Cloudera Repository
3. In my maven pom.xml I have below configuration
Please note below kafka_clients exclusion while defining spark-streaming-kafka
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
<properties>
<java.version>1.8</java.version>
<scala.minor.version>2.10</scala.minor.version>
<scala.complete.version>${scala.minor.version}.6</scala.complete.version>
<spark.version>1.6.1</spark.version>
</properties>
.....
<dependencies>
...............
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_${scala.minor.version}</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0-kafka-2.0.1</version>
</dependency>
...............
<dependencies>
Hope this will help others
Thanks,
TrainingMyHobby