Reply
New Contributor
Posts: 5
Registered: ‎05-04-2016

run-example streaming.KafkaWordCount fails on CDH 5.7.0

 

Hi,

 

After an upgrade to CDH 5.7.0 we have troubles with the Kafka to Spark Streaming.

We're running Kafka 0.8.2.2

 

The example jar doesn’t work:

 

/opt/cloudera/parcels/CDH/lib/spark/bin/run-example streaming.KafkaWordCount ….

 

Attached is a log file.

 

16/05/04 10:06:23 WARN consumer.ConsumerFetcherThread: [ConsumerFetcherThread-wordcount1_host81436-cld.domain.com-1462349182172-13b9f2e7-0-2], Error in fetch kafka.consumer.ConsumerFetcherThread$FetchRequest@47fb7e6d. Possible cause: java.nio.BufferUnderflowException

16/05/04 10:06:24 WARN consumer.ConsumerFetcherThread: [ConsumerFetcherThread-wordcount1_host81436-cld.domain.com-1462349182172-13b9f2e7-0-1], Error in fetch kafka.consumer.ConsumerFetcherThread$FetchRequest@73b9a762. Possible cause: java.lang.IllegalArgumentException

 

We have no problem running this from my development pc, only in product op CDH 5.7.0 environment.

 

 

Any ideas?

 

Thanks,

Michel

Cloudera Employee
Posts: 481
Registered: ‎08-11-2014

Re: run-example streaming.KafkaWordCount fails on CDH 5.7.0

For CDH 5.7, you must use Kafka 0.9 (the "2.0.x" KAFKA parcel). This is because Spark Streaming here is built for the new 0.9 consumer API, which won't work with older versions.

Cloudera Employee
Posts: 35
Registered: ‎04-05-2016

Re: run-example streaming.KafkaWordCount fails on CDH 5.7.0

Kafka in CDH 5.7 is v2.0 (v0.9.0) and there is a known issue which tells you should use Kafka not less than v0.9.0 with Spark in CDH 5.7. You can find it here .

New Contributor
Posts: 5
Registered: ‎05-04-2016

Re: run-example streaming.KafkaWordCount fails on CDH 5.7.0

 

 

But if CDH 5.7.0 can only work with Kafka 0.9*, how can I use spark-streaming-kafka? The spark documentation states: Spark Streaming 1.6.1 is compatible with Kafka 0.8.2.1 (not mentioning 0.9.*).

 

Is spark-stream-kafka compatible with CDH 5.7.0?

 

A custom spark application which only has problems since the upgrade to 5.7.0 has following depedencies:

 

 

    <dependencies>
        <dependency> <!-- Spark dependency -->
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>1.6.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.10</artifactId>
            <version>1.6.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka_2.10</artifactId>
            <version>1.6.1</version>
        </dependency>


        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>2.2.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.8.2.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>0.8.2.1</version>
        </dependency>



        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch-spark_2.10</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.8.0</version>
            <type>jar</type>
            <scope>compile</scope>
        </dependency>
    </dependencies>

 

I generate an UBER-jar and submit my spark application this way:

 

spark-submit --class SPARKAPP\
 --name SPARKAPP\
 --master yarn \
 --deploy-mode client \
 ./uber-SPARKAPP-1.0-SNAPSHOT.jar 

When I look at the debug logging (YARN AM launch context/SPARK_DIST_CLASSPATH), I see kafka 0.9.0 being referenced.

/opt/cloudera/parcels/CDH-5.7.0-1.cdh5.7.0.p0.45/jars/kafka-clients-0.9.0-kafka-2.0.0.jar
/opt/cloudera/parcels/CDH-5.7.0-1.cdh5.7.0.p0.45/jars/kafka_2.10-0.9.0-kafka-2.0.0.jar

 

 Eventually I get this execption.

 

16/05/09 10:07:21 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, bfravicsvr81439-cld.opentsp.com): java.lang.IllegalArgumentException
·at java.nio.Buffer.limit(Buffer.java:267)
·at kafka.api.FetchResponsePartitionData$.readFrom(FetchResponse.scala:38)
·at kafka.api.TopicData$$anonfun$1.apply(FetchResponse.scala:100)
·at kafka.api.TopicData$$anonfun$1.apply(FetchResponse.scala:98)
·at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
·at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
·at scala.collection.immutable.Range.foreach(Range.scala:141)
·at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
·at scala.collection.AbstractTraversable.map(Traversable.scala:105)
·at kafka.api.TopicData$.readFrom(FetchResponse.scala:98)
·at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:170)
·at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:169)
·at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
·at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
·at scala.collection.immutable.Range.foreach(Range.scala:141)
·at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
·at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
·at kafka.api.FetchResponse$.readFrom(FetchResponse.scala:169)
·at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:135)
·at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192)
·at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208)
·at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
·at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
·at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
·at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
·at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:126)
·at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
·at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
·at org.apache.spark.scheduler.Task.run(Task.scala:89)
·at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
·at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
·at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
·at java.lang.Thread.run(Thread.java:745)

 

How can I build an apache Spark application using spark-streaming-kafka to run on CDH 5.7.0?

 

 

 

 

 

 

 

 

 

 

 

Cloudera Employee
Posts: 481
Registered: ‎08-11-2014

Re: run-example streaming.KafkaWordCount fails on CDH 5.7.0

The CDH distro of Spark Streaming is built to work with the Kafka 0.9 APIs. If your app doesn't use Kafka directly, this will be transparent and won't matter to you, because your app would mark Spark as 'provided' and would not bring in its own copy. If you do use Kafka directly, you'll need to depend on Kafka API directly no matter what. And in that case, you will need to refer to 0.9 artifacts because that is what is provided at runtime. You can also get away with building vs the CDH spark-streaming artifacts, since you'll transitively get the 0.9 Kafka APIs without additional config.

New Contributor
Posts: 5
Registered: ‎05-04-2016

Re: run-example streaming.KafkaWordCount fails on CDH 5.7.0

 

With the CDH 5.7.0 parcels I get the same error as on the cluster. 

 

 

 

 <repositories>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
    </repositories>


    <dependencies>
        <dependency> <!-- Spark dependency -->
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>1.6.0-cdh5.7.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.10</artifactId>
            <version>1.6.0-cdh5.7.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka_2.10</artifactId>
            <version>1.6.0-cdh5.7.0</version>
        </dependency>


        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>2.2.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.9.0.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>0.9.0.0</version>
        </dependency>



        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch-spark_2.10</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.8.0</version>
            <type>jar</type>
            <scope>compile</scope>
        </dependency>
    </dependencies>

 

 

I get an expection in Buffer.limit because newLimit is too big.

 

public final Buffer limit(int newLimit) { # newLimit: 1397966657
if ((newLimit > capacity) || (newLimit < 0)) # capacity: 1016447
throw new IllegalArgumentException();
limit = newLimit;
if (position > limit) position = limit;
if (mark > limit) mark = -1;
return this;
}

 

I think there's no way around upgrading to kafka 0.9, but will spark-streaming-kafka 1.6.1 be compatible with Kafka 0.9?

 

 

 

Cloudera Employee
Posts: 481
Registered: ‎08-11-2014

Re: run-example streaming.KafkaWordCount fails on CDH 5.7.0

I'm not sure if this is the problem, but all of those dependencies in your POM should be "provided" scope. You're also mixing CDH and non-CDH components, and while often that does not matter, you might try just relying on the spark-streaming artifact (not kafka) to see if that works and therefore narrows down the issue.

 

Yes, it works with Kafka 0.9, and if you're using Kafka directly you must use 0.9.

 

New Contributor
Posts: 2
Registered: ‎05-10-2016

Re: run-example streaming.KafkaWordCount fails on CDH 5.7.0

[ Edited ]

I have exactly same problem.

 

Is there anyone can run kafka consumer in CDH 5.7.0?

 

New Contributor
Posts: 5
Registered: ‎05-04-2016

Re: run-example streaming.KafkaWordCount fails on CDH 5.7.0

Ok, we now have Kafka 0.9

 

please tell me how to start the run-example.KafkaWordCount in a way that should work.

 

I run:

/opt/cloudera/parcels/CDH/lib/spark/bin/run-example streaming.KafkaWordCount ...

 

But I still get the same errors.

 

Cloudera Employee
Posts: 481
Registered: ‎08-11-2014

Re: run-example streaming.KafkaWordCount fails on CDH 5.7.0

I'm not sure you addressed the other problems I highlighted above, like having non-provided dependencies and mixing different versions.