Created on 04-18-2017 07:46 AM - edited 09-16-2022 04:28 AM
Hi, I'm attempting to get the ALS example in Oryx2 up and running using an AWS EMR cluster.
I've been using the requirements matrix here: http://oryx.io/docs/admin.html
(Aside: I see Oryx 2.4.x in the matrix but don't see a 2.4.x release here? https://github.com/OryxProject/oryx/releases)
My Kafka/Zookeeper hosts are using the following versions:
Oryx 2.3.0
kafka_2.11-0.9.0.1
spark-2.0.1-bin-hadoop2.7
I can run
./oryx-run.sh kafka-setup
and create the topics successfully.
I can also run
./oryx-run.sh kafka-input --input-file data.csv
and see the messages being added to the topic.
My cluster is using the following versions:
Hadoop 2.7.3
Spark 2.0.1
In addition, I've also included the following jars on the cluster (master and slave nodes):
kafka_2.11-0.9.0.1/libs/*
spark-streaming-kafka-0-8_2.11-2.0.1.jar
spark-core_2.11-2.0.1.jar
According to here:
https://spark.apache.org/docs/2.0.1/streaming-kafka-integration.html
spark-streaming-kafka-0-8 is suitable for broker version "0.8.2.1 or higher".
I have adapted the spark-submit launch for the Batch layer into an EMR "step". The only difference I've had to make is to change "deploy-mode" from client to cluster.
aws emr add-steps --cluster-id XXX --steps Type=spark,Name=OryxBatchLayer-ALSExample,Args=[--master,yarn,--deploy-mode,cluster,--name,OryxBatchLayer-ALSExample,--class,com.cloudera.oryx.batch.Main,--files,s3://mybucket/oryx.conf,--driver-memory,1g,--driver-java-options,"-Dconfig.file=oryx.conf",--executor-memory,4g,--executor-cores,8,--conf,spark.executor.extraJavaOptions="-Dconfig.file=oryx.conf",--conf,spark.ui.port=4040,--conf,spark.io.compression.codec=lzf,--conf,spark.logConf=true,--conf,spark.serializer=org.apache.spark.serializer.KryoSerializer,--conf,spark.speculation=true,--conf,spark.ui.showConsoleProgress=false,--num-executors=4,s3://mybucket/oryx-batch-2.3.0.jar],ActionOnFailure=CONTINUE
I then see the following error in the container:
17/04/18 14:15:00 ERROR JobScheduler: Error generating jobs for time 1492524900000 ms java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6$$anonfun$apply$7.apply(KafkaCluster.scala:97) at scala.Option.map(Option.scala:146) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:97) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:94) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
Apparently this is caused by a mismatch with Kafka/Spark versions, but from what I can see I have followed the recommendations. Any ideas?
Created 04-18-2017 10:36 AM
First, I'll tell you that this is Quite Complicated and confuses me too. Matching Spark and Kafka versions is tricky, exacerabated by multiple and incompatible Kafka APIs, multiplied by slight differences in which versions are shipped in what CDH package.
(Yes there is no 2.4 yet, I put it in there as a 'preview'.)
I recall that I am actually not able to get the 2.3 release to pass tests with upstream components and that's why it builds with the CDH profile enabled by default. I wanted to move on to Kafka 0.9 to enable security support. But this is supported by Spark 2.x's kafka-0_10 integration component. And that wasn't yet available for CDH because it didn't work with the CDH Kafka 0.9. But the kafka-0_8 component did work. But then that component didn't work when enable with standard Spark 2's distro. This is a nightmarish no-mans-land of version combos.
However master (2.4) should be clearer since it moves on to Kafka 0.10. It does work, or at least passes tests, vs Spark 2.1 and Kafka 0.10. In fact, I have a to-do to update the CDH dependencies too to get it working in master.
So: if you're making your own build for non-CDH components, can you try building 2.4 SNAPSHOT from master? If that's working I can hurry up getting the CDH part updated so we can cut a 2.4.0 release.
Created 04-18-2017 10:36 AM
First, I'll tell you that this is Quite Complicated and confuses me too. Matching Spark and Kafka versions is tricky, exacerabated by multiple and incompatible Kafka APIs, multiplied by slight differences in which versions are shipped in what CDH package.
(Yes there is no 2.4 yet, I put it in there as a 'preview'.)
I recall that I am actually not able to get the 2.3 release to pass tests with upstream components and that's why it builds with the CDH profile enabled by default. I wanted to move on to Kafka 0.9 to enable security support. But this is supported by Spark 2.x's kafka-0_10 integration component. And that wasn't yet available for CDH because it didn't work with the CDH Kafka 0.9. But the kafka-0_8 component did work. But then that component didn't work when enable with standard Spark 2's distro. This is a nightmarish no-mans-land of version combos.
However master (2.4) should be clearer since it moves on to Kafka 0.10. It does work, or at least passes tests, vs Spark 2.1 and Kafka 0.10. In fact, I have a to-do to update the CDH dependencies too to get it working in master.
So: if you're making your own build for non-CDH components, can you try building 2.4 SNAPSHOT from master? If that's working I can hurry up getting the CDH part updated so we can cut a 2.4.0 release.
Created 04-19-2017 06:19 AM
Hi srowen, thanks for the repsonse.
Below is my updated config:
My Kafka/Zookeeper hosts are using the following versions: Oryx 2.3.0 kafka_2.11-0.10.0.0 spark-2.0.1-bin-hadoop2.7 My cluster is using the following versions: Hadoop 2.7.3 Spark 2.0.1 In addition, I've also included the following jars on the cluster (master and slave nodes): kafka_2.11-0.10.0.0/libs/* spark-streaming-kafka-0-10_2.11-2.0.1.jar spark-core_2.11-2.0.1.jar
I've built oryx-batch-2.4.0-SNAPSHOT.jar and oryx-serving-2.4.0-SNAPSHOT.jar from master. The batch layer appears to be running now.
I'm trying to confirm that it is indeed working by running the serving layer ingestion example.
wget --quiet --post-file data.csv --output-document - --header "Content-Type: text/csv" http://localhost:8080/ingest
I've taken guidance from this post to update the runtime dependencies for the serving layer. (Updated spark-streaming-kafka-0-8 to spark-streaming-kafka-0-10).
https://github.com/OryxProject/oryx/issues/265
However I see two issues with the serving layer. On startup I see the following message repeatedly:
2017-04-19 09:11:50,199 WARN NetworkClient:568 Bootstrap broker <kafka-host>:9092 disconnected 2017-04-19 09:11:50,401 WARN NetworkClient:568 Bootstrap broker <kafka-host>:9092 disconnected
I'm not sure what this means exactly, but apparently it might be something to do with SSL?
When I try the ingest step, I get the following error on the serving layer:
2017-04-19 09:13:22,271 INFO OryxApplication:65 Creating JAX-RS from endpoints in package(s) com.cloudera.oryx.app.serving,com.cloudera.oryx.app.serving.als,com.cloudera.oryx.lambda.serving 2017-04-19 09:13:22,437 WARN NetworkClient:568 Bootstrap broker <kafka-host>:9092 disconnected 2017-04-19 09:13:22,497 INFO Reflections:232 Reflections took 225 ms to scan 1 urls, producing 17 keys and 106 values Apr 19, 2017 9:13:22 AM org.apache.catalina.core.ApplicationContext log SEVERE: StandardWrapper.Throwable java.lang.NoSuchMethodError: com.google.common.collect.Sets$SetView.iterator()Lcom/google/common/collect/UnmodifiableIterator; at org.reflections.Reflections.expandSuperTypes(Reflections.java:380) at org.reflections.Reflections.<init>(Reflections.java:126) at org.reflections.Reflections.<init>(Reflections.java:168) at org.reflections.Reflections.<init>(Reflections.java:141) at com.cloudera.oryx.lambda.serving.OryxApplication.doGetClasses(OryxApplication.java:69) at com.cloudera.oryx.lambda.serving.OryxApplication.getClasses(OryxApplication.java:57) at org.glassfish.jersey.server.ResourceConfig$RuntimeConfig$3.run(ResourceConfig.java:1234) at org.glassfish.jersey.internal.Errors$2.call(Errors.java:289)
This apparently has something to do with the guava versions on the classpath. Although from what I can see, the new runtime dependencies I have added use the same guava version as the existing dependencies.
Created 04-19-2017 06:24 AM
You should probably use 2.4.0-SNAPSHOT, but, also use Spark 2.1.0 rather than 2.0.x
The last error is one I just managed to find and fix today when I produced some other updates. Try again with the very latest code.
Created 04-19-2017 08:28 AM
I've pulled and rebuilt 2.4.0-SNAPSHOT.
Kafka/Zookepper hosts are using oryx-run.sh and oryx.conf (ALS example) and have been updated to use Spark 2.1.0.
I've updated the Cluster to use Spark 2.1.0 also.
Batch layer is running again.
Serving layer guava issue is resolved but I am still seeing the broker issue:
NetworkClient:568 Bootstrap broker <kafka-host>:9092 disconnected
Here's some output from when the serving layer starts up:
2017-04-19 10:45:31,496 INFO ConsumerConfig:180 ConsumerConfig values: auto.commit.interval.ms = 5000 auto.offset.reset = earliest bootstrap.servers = [<kafka-host>:9092] check.crcs = true client.id = consumer-1 connections.max.idle.ms = 540000 enable.auto.commit = true exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = OryxGroup-ServingLayer-1545f926-8dd9-499d-aaff-1e3a709c0645 heartbeat.interval.ms = 3000 interceptor.classes = null key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer max.partition.fetch.bytes = 16777216 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.ms = 50 request.timeout.ms = 305000 retry.backoff.ms = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 session.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
2017-04-19 10:45:54,297 INFO ProducerConfig:180 ProducerConfig values: acks = 1 batch.size = 16384 block.on.buffer.full = false bootstrap.servers = [<kafka-host>:9092] buffer.memory = 33554432 client.id = producer-1 compression.type = gzip connections.max.idle.ms = 540000 interceptor.classes = null key.serializer = class org.apache.kafka.common.serialization.StringSerializer linger.ms = 1000 max.block.ms = 60000 max.in.flight.requests.per.connection = 5 max.request.size = 67108864 metadata.fetch.timeout.ms = 60000 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.sample.window.ms = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner receive.buffer.bytes = 32768 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retries = 0 retry.backoff.ms = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS timeout.ms = 30000 value.serializer = class org.apache.kafka.common.serialization.StringSerializer
When I run the ingest call, nothing seems to happen. I don't see any extra ouput on the servling layer or on the topics.
Created on 04-19-2017 08:46 AM - edited 04-19-2017 09:06 AM
Correction to my last post:
The ProducerConfig only shows in the servling layer ouput once the ingest is invoked.
I also see the following:
Apr 19, 2017 12:04:01 PM org.apache.catalina.util.SessionIdGeneratorBase createSecureRandom INFO: Creation of SecureRandom instance for session ID generation using [SHA1PRNG] took [34,430] milliseconds. 2017-04-19 12:04:01,939 WARN NetworkClient:568 Bootstrap broker <kafka-host>:9092 disconnected 2017-04-19 12:04:02,040 INFO OryxApplication:65 Creating JAX-RS from endpoints in package(s) com.cloudera.oryx.app.serving,com.cloudera.oryx.app.serving.als,com.cloudera.oryx.lambda.serving 2017-04-19 12:04:02,148 WARN NetworkClient:568 Bootstrap broker <kafka-host>:9092 disconnected 2017-04-19 12:04:02,354 WARN NetworkClient:568 Bootstrap broker <kafka-host>:9092 disconnected 2017-04-19 12:04:02,413 INFO Reflections:229 Reflections took 345 ms to scan 1 urls, producing 17 keys and 106 values 2017-04-19 12:04:02,572 WARN NetworkClient:568 Bootstrap broker <kafka-host>:9092 disconnected 2017-04-19 12:04:02,681 INFO Reflections:229 Reflections took 219 ms to scan 1 urls, producing 10 keys and 64 values 2017-04-19 12:04:02,781 WARN NetworkClient:568 Bootstrap broker <kafka-host>:9092 disconnected 2017-04-19 12:04:02,831 INFO Reflections:229 Reflections took 145 ms to scan 1 urls, producing 13 keys and 14 values 2017-04-19 12:04:02,998 WARN NetworkClient:568 Bootstrap broker <kafka-host>:9092 disconnected 2017-04-19 12:04:03,216 WARN NetworkClient:568 Bootstrap broker <kafka-host>:9092 disconnected 2017-04-19 12:04:03,439 WARN NetworkClient:568 Bootstrap broker <kafka-host>:9092 disconnected Apr 19, 2017 12:04:03 PM org.apache.coyote.AbstractProtocol start INFO: Starting ProtocolHandler ["http-nio2-8080"]
Created 04-19-2017 09:44 AM
I solved this!
The issue was that my kafka/zookeeper hosts and my cluster were using kafka_2.11-0.10.0.0
2.4.0-SNAPSHOT is using 0.10.1.1
I've updated to use kafka_2.11-0.10.1.1
Created 04-21-2017 08:16 AM
Oh, now I see the same 'disconnected' problem you did.
It turns out that Kafka 0.10.0 and 0.10.1 are not protocol-compatible, which is quite disappointing.
So I think I'm going to have to back up and revert master/2.4 to Kafka 0.10.0, because that's the flavor that CDH is on and would like to avoid having two builds to support 0.10.0 vs 0.10.1. I hope that isn't a big deal to switch back in your prototype?
Created 04-21-2017 08:46 AM
No problem, I'll get around to testing that out on Monday.
Created 04-24-2017 09:34 AM
Hi, I can confirm my setup is working with 0.10.0.0.
I noticed one issue in the serving layer output (I noticed this before I made the change, so it is not new).
2017-04-24 16:26:54,052 INFO ALSServingModelManager:96 ALSServingModel[features:10, implicit:true, X:(877 users), Y:(1639 items, partitions: [0:1296, 1:343]...), fractionLoaded:1.0] 2017-04-24 16:26:54,053 INFO SolverCache:78 Computing cached solver 2017-04-24 16:26:54,111 INFO SolverCache:83 Computed new solver null
Shoud there be a null value in this last message output?
here's the code in question com.cloudera.oryx.app.als.SolverCache
if (newYTYSolver != null) { log.info("Computed new solver {}", solver); solver.set(newYTYSolver); }