Created on 04-18-2017 07:46 AM - edited 09-16-2022 04:28 AM
Hi, I'm attempting to get the ALS example in Oryx2 up and running using an AWS EMR cluster.
I've been using the requirements matrix here: http://oryx.io/docs/admin.html
(Aside: I see Oryx 2.4.x in the matrix but don't see a 2.4.x release here? https://github.com/OryxProject/oryx/releases)
My Kafka/Zookeeper hosts are using the following versions:
Oryx 2.3.0
kafka_2.11-0.9.0.1
spark-2.0.1-bin-hadoop2.7
I can run
./oryx-run.sh kafka-setup
and create the topics successfully.
I can also run
./oryx-run.sh kafka-input --input-file data.csv
and see the messages being added to the topic.
My cluster is using the following versions:
Hadoop 2.7.3
Spark 2.0.1
In addition, I've also included the following jars on the cluster (master and slave nodes):
kafka_2.11-0.9.0.1/libs/*
spark-streaming-kafka-0-8_2.11-2.0.1.jar
spark-core_2.11-2.0.1.jar
According to here:
https://spark.apache.org/docs/2.0.1/streaming-kafka-integration.html
spark-streaming-kafka-0-8 is suitable for broker version "0.8.2.1 or higher".
I have adapted the spark-submit launch for the Batch layer into an EMR "step". The only difference I've had to make is to change "deploy-mode" from client to cluster.
aws emr add-steps --cluster-id XXX --steps Type=spark,Name=OryxBatchLayer-ALSExample,Args=[--master,yarn,--deploy-mode,cluster,--name,OryxBatchLayer-ALSExample,--class,com.cloudera.oryx.batch.Main,--files,s3://mybucket/oryx.conf,--driver-memory,1g,--driver-java-options,"-Dconfig.file=oryx.conf",--executor-memory,4g,--executor-cores,8,--conf,spark.executor.extraJavaOptions="-Dconfig.file=oryx.conf",--conf,spark.ui.port=4040,--conf,spark.io.compression.codec=lzf,--conf,spark.logConf=true,--conf,spark.serializer=org.apache.spark.serializer.KryoSerializer,--conf,spark.speculation=true,--conf,spark.ui.showConsoleProgress=false,--num-executors=4,s3://mybucket/oryx-batch-2.3.0.jar],ActionOnFailure=CONTINUE
I then see the following error in the container:
17/04/18 14:15:00 ERROR JobScheduler: Error generating jobs for time 1492524900000 ms java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6$$anonfun$apply$7.apply(KafkaCluster.scala:97) at scala.Option.map(Option.scala:146) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:97) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:94) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
Apparently this is caused by a mismatch with Kafka/Spark versions, but from what I can see I have followed the recommendations. Any ideas?
Created 04-18-2017 10:36 AM
First, I'll tell you that this is Quite Complicated and confuses me too. Matching Spark and Kafka versions is tricky, exacerabated by multiple and incompatible Kafka APIs, multiplied by slight differences in which versions are shipped in what CDH package.
(Yes there is no 2.4 yet, I put it in there as a 'preview'.)
I recall that I am actually not able to get the 2.3 release to pass tests with upstream components and that's why it builds with the CDH profile enabled by default. I wanted to move on to Kafka 0.9 to enable security support. But this is supported by Spark 2.x's kafka-0_10 integration component. And that wasn't yet available for CDH because it didn't work with the CDH Kafka 0.9. But the kafka-0_8 component did work. But then that component didn't work when enable with standard Spark 2's distro. This is a nightmarish no-mans-land of version combos.
However master (2.4) should be clearer since it moves on to Kafka 0.10. It does work, or at least passes tests, vs Spark 2.1 and Kafka 0.10. In fact, I have a to-do to update the CDH dependencies too to get it working in master.
So: if you're making your own build for non-CDH components, can you try building 2.4 SNAPSHOT from master? If that's working I can hurry up getting the CDH part updated so we can cut a 2.4.0 release.
Created 04-24-2017 09:40 AM
(BTW I went ahead and made a 2.4.0 release to have something official and probably-working out there. It worked on my CDH 5.11 + Spark 2.1 + Kafka 0.10.0 cluster.
Yes that's a minor problem in the log message. It should reference newYTYSolver. I'll fix that but it shouldn't otherwise affect anything.
Created 06-27-2017 05:00 AM
Hi srowen,
I've a general question about the ALS recommender. Let's say instead of a specific user rating, I'd like to recommend based on a usage count i.e. everytime a user interacted with an item, I submit a new ingestion: userID,itemID (with this type of input the "strength" is assigned a default value of 1).
I've seen that if I submit multiple requests like this for the same userID,itemID combo that /recommend/userID/?considerKnownItems=true will return itemID with a higher recommended score.
Is this based on a cumulative "strength" i.e. multiple "ratings" of the same item adding up, or is it based on the fact that the item was "rated" more recently than other items? Or is it something else?
Alternatively, on ingestion, would it make sense to get the current "rating" of an item by a user and increment that by 1 before submitting it?
Does my use case make sense or would it be suited to a different type of algorithm?
Thanks
Created 06-27-2017 05:04 AM
(Please start a new thread)
Yes, all scores are cumluative and added across all input. I am not sure what your use case, but what you're suggesting is how it works: submitting (user, item, 1) adds 1 to the total strength of interaction.
Created 06-27-2017 06:18 AM
Apologies, new thread created: https://community.cloudera.com/t5/Data-Science-and-Machine/Oryx2-ALS-recommender/m-p/56566
Created on 05-21-2017 09:05 AM - edited 05-21-2017 09:05 AM
Hi! Can you build parcel for Kafka 0.10.1.1 ? I have same problem, but i can`t switch client app to 0.10.0.0, i can only up broker side
Created 05-21-2017 09:41 AM
2.4.x should work, does it not? or do you need 0.10.1.1+ for compatibility with the 0.10.1.1 broker?
Created 05-21-2017 11:16 AM
i have try to explain, we have kafka broker in cloudera installation, version 0.10.0.0, and some apps with kafka-client 0.10.1.1, when i try to connect i see error bootstrap disconnected. If i downgrade kafka-client in app to 0.10.0.x all fine, with connection, but in my situation, i can`t change all ready installed apps (with 0.10.1.1) but, if i understand, if i up version kafka broker in cloudera cluster to 0.10.1.1 all will working fine. I think all what i need it kafka parcel with version 0.10.1.x. Or i missing somethink?
Created 05-22-2017 02:13 AM
So from other sources, I see notes about the incompatibility. It sounds like the 0.10.2 release was fixed to be compatible across maintenance releases? so if the project used 0.10.2, I think that would work for you and all 0.10.x brokers?
Created 05-22-2017 03:21 AM
It is working! Thnank you a lot, 0.10.2.X kafka-client well connect to 0.10.1.x and 0.10.0.x brokers 🙂
Created 04-19-2017 10:55 AM
Hm, I don't recall seeing the 'disconnected' message. Is there more detail?
On its face it seems like the serving layer can't see the broker? do some ports need to be opened?