Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Oryx2 Kafka Broker Issue

avatar
Explorer

Hi, I'm attempting to get the ALS example in Oryx2 up and running using an AWS EMR cluster. 

I've been using the requirements matrix here: http://oryx.io/docs/admin.html

(Aside: I see Oryx 2.4.x in the matrix but don't see a 2.4.x release here? https://github.com/OryxProject/oryx/releases)

 

My Kafka/Zookeeper hosts are using the following versions:

Oryx 2.3.0
kafka_2.11-0.9.0.1

spark-2.0.1-bin-hadoop2.7

 

I can run 

 

./oryx-run.sh kafka-setup

 and create the topics successfully. 

 

 

I can also run 

 

./oryx-run.sh kafka-input --input-file data.csv

and see the messages being added to the topic.

My cluster is using the following versions:
Hadoop 2.7.3
Spark 2.0.1

 

In addition, I've also included the following jars on the cluster (master and slave nodes):

kafka_2.11-0.9.0.1/libs/*

spark-streaming-kafka-0-8_2.11-2.0.1.jar

spark-core_2.11-2.0.1.jar

 

According to here:
https://spark.apache.org/docs/2.0.1/streaming-kafka-integration.html
spark-streaming-kafka-0-8 is suitable for broker version "0.8.2.1 or higher". 

 

I have adapted the spark-submit launch for the Batch layer into an EMR "step". The only difference I've had to make is to change "deploy-mode" from client to cluster. 

 

aws emr add-steps --cluster-id XXX --steps Type=spark,Name=OryxBatchLayer-ALSExample,Args=[--master,yarn,--deploy-mode,cluster,--name,OryxBatchLayer-ALSExample,--class,com.cloudera.oryx.batch.Main,--files,s3://mybucket/oryx.conf,--driver-memory,1g,--driver-java-options,"-Dconfig.file=oryx.conf",--executor-memory,4g,--executor-cores,8,--conf,spark.executor.extraJavaOptions="-Dconfig.file=oryx.conf",--conf,spark.ui.port=4040,--conf,spark.io.compression.codec=lzf,--conf,spark.logConf=true,--conf,spark.serializer=org.apache.spark.serializer.KryoSerializer,--conf,spark.speculation=true,--conf,spark.ui.showConsoleProgress=false,--num-executors=4,s3://mybucket/oryx-batch-2.3.0.jar],ActionOnFailure=CONTINUE

 

I then see the following error in the container:

17/04/18 14:15:00 ERROR JobScheduler: Error generating jobs for time 1492524900000 ms
java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker
	at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6$$anonfun$apply$7.apply(KafkaCluster.scala:97)
	at scala.Option.map(Option.scala:146)
	at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:97)
	at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:94)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)



Apparently this is caused by a mismatch with Kafka/Spark versions, but from what I can see I have followed the recommendations. Any ideas?

 

 

 

 

 

 

 

 

 

 

 

 

 

 

1 ACCEPTED SOLUTION

avatar
Master Collaborator

First, I'll tell you that this is Quite Complicated and confuses me too. Matching Spark and Kafka versions is tricky, exacerabated by multiple and incompatible Kafka APIs, multiplied by slight differences in which versions are shipped in what CDH package.

 

(Yes there is no 2.4 yet, I put it in there as a 'preview'.)

 

I recall that I am actually not able to get the 2.3 release to pass tests with upstream components and that's why it builds with the CDH profile enabled by default. I wanted to move on to Kafka 0.9 to enable security support. But this is supported by Spark 2.x's kafka-0_10 integration component. And that wasn't yet available for CDH because it didn't work with the CDH Kafka 0.9. But the kafka-0_8 component did work. But then that component didn't work when enable with standard Spark 2's distro. This is a nightmarish no-mans-land of version combos.

 

However master (2.4) should be clearer since it moves on to Kafka 0.10. It does work, or at least passes tests, vs Spark 2.1 and Kafka 0.10. In fact, I have a to-do to update the CDH dependencies too to get it working in master.

 

So: if you're making your own build for non-CDH components, can you try building 2.4 SNAPSHOT from master? If that's working I can hurry up getting the CDH part updated so we can cut a 2.4.0 release.

View solution in original post

27 REPLIES 27

avatar
Master Collaborator

(BTW I went ahead and made a 2.4.0 release to have something official and probably-working out there. It worked on my CDH 5.11  + Spark 2.1 + Kafka 0.10.0 cluster.

 

Yes that's a minor problem in the log message. It should reference newYTYSolver. I'll fix that but it shouldn't otherwise affect anything.

avatar
Explorer

Hi srowen,

I've a general question about the ALS recommender. Let's say instead of a specific user rating, I'd like to recommend based on a usage count i.e. everytime a user interacted with an item, I submit a new ingestion: userID,itemID (with this type of input the "strength" is assigned a default value of 1).

 

I've seen that if I submit multiple requests like this for the same userID,itemID combo that /recommend/userID/?considerKnownItems=true will return itemID with a higher recommended score. 

 

Is this based on a cumulative "strength" i.e. multiple "ratings" of the same item adding up, or is it based on the fact that the item was "rated" more recently than other items? Or is it something else?

 

Alternatively, on ingestion, would it make sense to get the current "rating" of an item by a user and increment that by 1 before submitting it? 

Does my use case make sense or would it be suited to a different type of algorithm? 

Thanks

 

 

 

 

avatar
Master Collaborator

(Please start a new thread)

Yes, all scores are cumluative and added across all input. I am not sure what your use case, but what you're suggesting is how it works: submitting (user, item, 1) adds 1 to the total strength of interaction.

avatar
Explorer

avatar
New Contributor

Hi! Can you build parcel for Kafka 0.10.1.1 ? I have same problem, but i can`t switch client app to 0.10.0.0, i can only up broker side

avatar
Master Collaborator

2.4.x should work, does it not? or do you need 0.10.1.1+ for compatibility with the 0.10.1.1 broker?

avatar
New Contributor

i have try to explain, we have kafka broker in cloudera installation, version 0.10.0.0, and some apps with kafka-client 0.10.1.1, when i try to connect i see error bootstrap disconnected. If i downgrade kafka-client in app to 0.10.0.x all fine, with connection, but in my situation, i can`t change all ready installed apps (with 0.10.1.1) but, if i understand, if i up version kafka broker in cloudera cluster to 0.10.1.1 all will working fine. I think all what i need it kafka parcel with version 0.10.1.x. Or i missing somethink?

avatar
Master Collaborator

So from other sources, I see notes about the incompatibility. It sounds like the 0.10.2 release was fixed to be compatible across maintenance releases? so if the project used 0.10.2, I think that would work for you and all 0.10.x brokers?

avatar
New Contributor

It is working! Thnank you a lot, 0.10.2.X kafka-client well connect to 0.10.1.x and 0.10.0.x  brokers 🙂

avatar
Master Collaborator

Hm, I don't recall seeing the 'disconnected' message. Is there more detail?

On its face it seems like the serving layer can't see the broker? do some ports need to be opened?