Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

run-example streaming.KafkaWordCount fails on CDH 5.7.0

run-example streaming.KafkaWordCount fails on CDH 5.7.0

New Contributor

 

Hi,

 

After an upgrade to CDH 5.7.0 we have troubles with the Kafka to Spark Streaming.

We're running Kafka 0.8.2.2

 

The example jar doesn’t work:

 

/opt/cloudera/parcels/CDH/lib/spark/bin/run-example streaming.KafkaWordCount ….

 

Attached is a log file.

 

16/05/04 10:06:23 WARN consumer.ConsumerFetcherThread: [ConsumerFetcherThread-wordcount1_host81436-cld.domain.com-1462349182172-13b9f2e7-0-2], Error in fetch kafka.consumer.ConsumerFetcherThread$FetchRequest@47fb7e6d. Possible cause: java.nio.BufferUnderflowException

16/05/04 10:06:24 WARN consumer.ConsumerFetcherThread: [ConsumerFetcherThread-wordcount1_host81436-cld.domain.com-1462349182172-13b9f2e7-0-1], Error in fetch kafka.consumer.ConsumerFetcherThread$FetchRequest@73b9a762. Possible cause: java.lang.IllegalArgumentException

 

We have no problem running this from my development pc, only in product op CDH 5.7.0 environment.

 

 

Any ideas?

 

Thanks,

Michel

13 REPLIES 13

Re: run-example streaming.KafkaWordCount fails on CDH 5.7.0

Master Collaborator

For CDH 5.7, you must use Kafka 0.9 (the "2.0.x" KAFKA parcel). This is because Spark Streaming here is built for the new 0.9 consumer API, which won't work with older versions.

Highlighted

Re: run-example streaming.KafkaWordCount fails on CDH 5.7.0

Contributor

Kafka in CDH 5.7 is v2.0 (v0.9.0) and there is a known issue which tells you should use Kafka not less than v0.9.0 with Spark in CDH 5.7. You can find it here .

Re: run-example streaming.KafkaWordCount fails on CDH 5.7.0

New Contributor

 

 

But if CDH 5.7.0 can only work with Kafka 0.9*, how can I use spark-streaming-kafka? The spark documentation states: Spark Streaming 1.6.1 is compatible with Kafka 0.8.2.1 (not mentioning 0.9.*).

 

Is spark-stream-kafka compatible with CDH 5.7.0?

 

A custom spark application which only has problems since the upgrade to 5.7.0 has following depedencies:

 

 

    <dependencies>
        <dependency> <!-- Spark dependency -->
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>1.6.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.10</artifactId>
            <version>1.6.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka_2.10</artifactId>
            <version>1.6.1</version>
        </dependency>


        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>2.2.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.8.2.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>0.8.2.1</version>
        </dependency>



        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch-spark_2.10</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.8.0</version>
            <type>jar</type>
            <scope>compile</scope>
        </dependency>
    </dependencies>

 

I generate an UBER-jar and submit my spark application this way:

 

spark-submit --class SPARKAPP\
 --name SPARKAPP\
 --master yarn \
 --deploy-mode client \
 ./uber-SPARKAPP-1.0-SNAPSHOT.jar 

When I look at the debug logging (YARN AM launch context/SPARK_DIST_CLASSPATH), I see kafka 0.9.0 being referenced.

/opt/cloudera/parcels/CDH-5.7.0-1.cdh5.7.0.p0.45/jars/kafka-clients-0.9.0-kafka-2.0.0.jar
/opt/cloudera/parcels/CDH-5.7.0-1.cdh5.7.0.p0.45/jars/kafka_2.10-0.9.0-kafka-2.0.0.jar

 

 Eventually I get this execption.

 

16/05/09 10:07:21 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, bfravicsvr81439-cld.opentsp.com): java.lang.IllegalArgumentException
·at java.nio.Buffer.limit(Buffer.java:267)
·at kafka.api.FetchResponsePartitionData$.readFrom(FetchResponse.scala:38)
·at kafka.api.TopicData$$anonfun$1.apply(FetchResponse.scala:100)
·at kafka.api.TopicData$$anonfun$1.apply(FetchResponse.scala:98)
·at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
·at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
·at scala.collection.immutable.Range.foreach(Range.scala:141)
·at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
·at scala.collection.AbstractTraversable.map(Traversable.scala:105)
·at kafka.api.TopicData$.readFrom(FetchResponse.scala:98)
·at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:170)
·at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:169)
·at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
·at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
·at scala.collection.immutable.Range.foreach(Range.scala:141)
·at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
·at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
·at kafka.api.FetchResponse$.readFrom(FetchResponse.scala:169)
·at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:135)
·at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192)
·at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208)
·at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
·at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
·at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
·at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
·at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:126)
·at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
·at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
·at org.apache.spark.scheduler.Task.run(Task.scala:89)
·at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
·at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
·at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
·at java.lang.Thread.run(Thread.java:745)

 

How can I build an apache Spark application using spark-streaming-kafka to run on CDH 5.7.0?

 

 

 

 

 

 

 

 

 

 

 

Re: run-example streaming.KafkaWordCount fails on CDH 5.7.0

Master Collaborator

The CDH distro of Spark Streaming is built to work with the Kafka 0.9 APIs. If your app doesn't use Kafka directly, this will be transparent and won't matter to you, because your app would mark Spark as 'provided' and would not bring in its own copy. If you do use Kafka directly, you'll need to depend on Kafka API directly no matter what. And in that case, you will need to refer to 0.9 artifacts because that is what is provided at runtime. You can also get away with building vs the CDH spark-streaming artifacts, since you'll transitively get the 0.9 Kafka APIs without additional config.

Re: run-example streaming.KafkaWordCount fails on CDH 5.7.0

New Contributor

 

With the CDH 5.7.0 parcels I get the same error as on the cluster. 

 

 

 

 <repositories>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
    </repositories>


    <dependencies>
        <dependency> <!-- Spark dependency -->
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>1.6.0-cdh5.7.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.10</artifactId>
            <version>1.6.0-cdh5.7.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka_2.10</artifactId>
            <version>1.6.0-cdh5.7.0</version>
        </dependency>


        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>2.2.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.9.0.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>0.9.0.0</version>
        </dependency>



        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch-spark_2.10</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.8.0</version>
            <type>jar</type>
            <scope>compile</scope>
        </dependency>
    </dependencies>

 

 

I get an expection in Buffer.limit because newLimit is too big.

 

public final Buffer limit(int newLimit) { # newLimit: 1397966657
if ((newLimit > capacity) || (newLimit < 0)) # capacity: 1016447
throw new IllegalArgumentException();
limit = newLimit;
if (position > limit) position = limit;
if (mark > limit) mark = -1;
return this;
}

 

I think there's no way around upgrading to kafka 0.9, but will spark-streaming-kafka 1.6.1 be compatible with Kafka 0.9?

 

 

 

Re: run-example streaming.KafkaWordCount fails on CDH 5.7.0

Master Collaborator

I'm not sure if this is the problem, but all of those dependencies in your POM should be "provided" scope. You're also mixing CDH and non-CDH components, and while often that does not matter, you might try just relying on the spark-streaming artifact (not kafka) to see if that works and therefore narrows down the issue.

 

Yes, it works with Kafka 0.9, and if you're using Kafka directly you must use 0.9.

 

Re: run-example streaming.KafkaWordCount fails on CDH 5.7.0

New Contributor

Ok, we now have Kafka 0.9

 

please tell me how to start the run-example.KafkaWordCount in a way that should work.

 

I run:

/opt/cloudera/parcels/CDH/lib/spark/bin/run-example streaming.KafkaWordCount ...

 

But I still get the same errors.

 

Re: run-example streaming.KafkaWordCount fails on CDH 5.7.0

Master Collaborator

I'm not sure you addressed the other problems I highlighted above, like having non-provided dependencies and mixing different versions.

Re: run-example streaming.KafkaWordCount fails on CDH 5.7.0

New Contributor
But I want to use the provided example "streaming.KafkaWorkCount" which should not have any issues, wright?
Don't have an account?
Coming from Hortonworks? Activate your account here