Support Questions

Find answers, ask questions, and share your expertise

Oryx2 Kafka Broker Issue

avatar
Explorer

Hi, I'm attempting to get the ALS example in Oryx2 up and running using an AWS EMR cluster. 

I've been using the requirements matrix here: http://oryx.io/docs/admin.html

(Aside: I see Oryx 2.4.x in the matrix but don't see a 2.4.x release here? https://github.com/OryxProject/oryx/releases)

 

My Kafka/Zookeeper hosts are using the following versions:

Oryx 2.3.0
kafka_2.11-0.9.0.1

spark-2.0.1-bin-hadoop2.7

 

I can run 

 

./oryx-run.sh kafka-setup

 and create the topics successfully. 

 

 

I can also run 

 

./oryx-run.sh kafka-input --input-file data.csv

and see the messages being added to the topic.

My cluster is using the following versions:
Hadoop 2.7.3
Spark 2.0.1

 

In addition, I've also included the following jars on the cluster (master and slave nodes):

kafka_2.11-0.9.0.1/libs/*

spark-streaming-kafka-0-8_2.11-2.0.1.jar

spark-core_2.11-2.0.1.jar

 

According to here:
https://spark.apache.org/docs/2.0.1/streaming-kafka-integration.html
spark-streaming-kafka-0-8 is suitable for broker version "0.8.2.1 or higher". 

 

I have adapted the spark-submit launch for the Batch layer into an EMR "step". The only difference I've had to make is to change "deploy-mode" from client to cluster. 

 

aws emr add-steps --cluster-id XXX --steps Type=spark,Name=OryxBatchLayer-ALSExample,Args=[--master,yarn,--deploy-mode,cluster,--name,OryxBatchLayer-ALSExample,--class,com.cloudera.oryx.batch.Main,--files,s3://mybucket/oryx.conf,--driver-memory,1g,--driver-java-options,"-Dconfig.file=oryx.conf",--executor-memory,4g,--executor-cores,8,--conf,spark.executor.extraJavaOptions="-Dconfig.file=oryx.conf",--conf,spark.ui.port=4040,--conf,spark.io.compression.codec=lzf,--conf,spark.logConf=true,--conf,spark.serializer=org.apache.spark.serializer.KryoSerializer,--conf,spark.speculation=true,--conf,spark.ui.showConsoleProgress=false,--num-executors=4,s3://mybucket/oryx-batch-2.3.0.jar],ActionOnFailure=CONTINUE

 

I then see the following error in the container:

17/04/18 14:15:00 ERROR JobScheduler: Error generating jobs for time 1492524900000 ms
java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker
	at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6$$anonfun$apply$7.apply(KafkaCluster.scala:97)
	at scala.Option.map(Option.scala:146)
	at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:97)
	at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:94)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)



Apparently this is caused by a mismatch with Kafka/Spark versions, but from what I can see I have followed the recommendations. Any ideas?

 

 

 

 

 

 

 

 

 

 

 

 

 

 

1 ACCEPTED SOLUTION

avatar
Master Collaborator

First, I'll tell you that this is Quite Complicated and confuses me too. Matching Spark and Kafka versions is tricky, exacerabated by multiple and incompatible Kafka APIs, multiplied by slight differences in which versions are shipped in what CDH package.

 

(Yes there is no 2.4 yet, I put it in there as a 'preview'.)

 

I recall that I am actually not able to get the 2.3 release to pass tests with upstream components and that's why it builds with the CDH profile enabled by default. I wanted to move on to Kafka 0.9 to enable security support. But this is supported by Spark 2.x's kafka-0_10 integration component. And that wasn't yet available for CDH because it didn't work with the CDH Kafka 0.9. But the kafka-0_8 component did work. But then that component didn't work when enable with standard Spark 2's distro. This is a nightmarish no-mans-land of version combos.

 

However master (2.4) should be clearer since it moves on to Kafka 0.10. It does work, or at least passes tests, vs Spark 2.1 and Kafka 0.10. In fact, I have a to-do to update the CDH dependencies too to get it working in master.

 

So: if you're making your own build for non-CDH components, can you try building 2.4 SNAPSHOT from master? If that's working I can hurry up getting the CDH part updated so we can cut a 2.4.0 release.

View solution in original post

27 REPLIES 27

avatar
Master Collaborator

First, I'll tell you that this is Quite Complicated and confuses me too. Matching Spark and Kafka versions is tricky, exacerabated by multiple and incompatible Kafka APIs, multiplied by slight differences in which versions are shipped in what CDH package.

 

(Yes there is no 2.4 yet, I put it in there as a 'preview'.)

 

I recall that I am actually not able to get the 2.3 release to pass tests with upstream components and that's why it builds with the CDH profile enabled by default. I wanted to move on to Kafka 0.9 to enable security support. But this is supported by Spark 2.x's kafka-0_10 integration component. And that wasn't yet available for CDH because it didn't work with the CDH Kafka 0.9. But the kafka-0_8 component did work. But then that component didn't work when enable with standard Spark 2's distro. This is a nightmarish no-mans-land of version combos.

 

However master (2.4) should be clearer since it moves on to Kafka 0.10. It does work, or at least passes tests, vs Spark 2.1 and Kafka 0.10. In fact, I have a to-do to update the CDH dependencies too to get it working in master.

 

So: if you're making your own build for non-CDH components, can you try building 2.4 SNAPSHOT from master? If that's working I can hurry up getting the CDH part updated so we can cut a 2.4.0 release.

avatar
Explorer

Hi srowen, thanks for the repsonse.

 

Below is my updated config:

 

 

My Kafka/Zookeeper hosts are using the following versions:
Oryx 2.3.0
kafka_2.11-0.10.0.0
spark-2.0.1-bin-hadoop2.7


My cluster is using the following versions:
Hadoop 2.7.3
Spark 2.0.1
 
In addition, I've also included the following jars on the cluster (master and slave nodes):
kafka_2.11-0.10.0.0/libs/*
spark-streaming-kafka-0-10_2.11-2.0.1.jar
spark-core_2.11-2.0.1.jar

I've built oryx-batch-2.4.0-SNAPSHOT.jar and oryx-serving-2.4.0-SNAPSHOT.jar from master. The batch layer appears to be running now.

 

I'm trying to confirm that it is indeed working by running the serving layer ingestion example.

wget --quiet --post-file data.csv --output-document -   --header "Content-Type: text/csv"   http://localhost:8080/ingest

 

 

I've taken guidance from this post to update the runtime dependencies for the serving layer. (Updated spark-streaming-kafka-0-8 to spark-streaming-kafka-0-10).


https://github.com/OryxProject/oryx/issues/265

However I see two issues with the serving layer. On startup I see the following message repeatedly:

 

2017-04-19 09:11:50,199 WARN  NetworkClient:568 Bootstrap broker <kafka-host>:9092 disconnected
2017-04-19 09:11:50,401 WARN  NetworkClient:568 Bootstrap broker <kafka-host>:9092 disconnected

 
I'm not sure what this means exactly, but apparently it might be something to do with SSL?

 

When I try the ingest step, I get the following error on the serving layer:

  

2017-04-19 09:13:22,271 INFO  OryxApplication:65 Creating JAX-RS from endpoints in package(s) com.cloudera.oryx.app.serving,com.cloudera.oryx.app.serving.als,com.cloudera.oryx.lambda.serving
2017-04-19 09:13:22,437 WARN  NetworkClient:568 Bootstrap broker <kafka-host>:9092 disconnected
2017-04-19 09:13:22,497 INFO  Reflections:232 Reflections took 225 ms to scan 1 urls, producing 17 keys and 106 values
Apr 19, 2017 9:13:22 AM org.apache.catalina.core.ApplicationContext log
SEVERE: StandardWrapper.Throwable
java.lang.NoSuchMethodError: com.google.common.collect.Sets$SetView.iterator()Lcom/google/common/collect/UnmodifiableIterator;
	at org.reflections.Reflections.expandSuperTypes(Reflections.java:380)
	at org.reflections.Reflections.<init>(Reflections.java:126)
	at org.reflections.Reflections.<init>(Reflections.java:168)
	at org.reflections.Reflections.<init>(Reflections.java:141)
	at com.cloudera.oryx.lambda.serving.OryxApplication.doGetClasses(OryxApplication.java:69)
	at com.cloudera.oryx.lambda.serving.OryxApplication.getClasses(OryxApplication.java:57)
	at org.glassfish.jersey.server.ResourceConfig$RuntimeConfig$3.run(ResourceConfig.java:1234)
	at org.glassfish.jersey.internal.Errors$2.call(Errors.java:289)

 

This apparently has something to do with the guava versions on the classpath. Although from what I can see, the new runtime dependencies I have added use the same guava version as the existing dependencies. 

 

 

 

 

 

avatar
Master Collaborator

You should probably use 2.4.0-SNAPSHOT, but, also use Spark 2.1.0 rather than 2.0.x

 

The last error is one I just managed to find and fix today when I produced some other updates. Try again with the very latest code.

avatar
Explorer

I've pulled and rebuilt 2.4.0-SNAPSHOT.

Kafka/Zookepper hosts are using oryx-run.sh and oryx.conf (ALS example) and have been updated to use Spark 2.1.0.

 

I've updated the Cluster to use Spark 2.1.0 also.

Batch layer is running again.

 

Serving layer guava issue is resolved but I am still seeing the broker issue:

NetworkClient:568 Bootstrap broker <kafka-host>:9092 disconnected

 

Here's some output from when the serving layer starts up:

2017-04-19 10:45:31,496 INFO  ConsumerConfig:180 ConsumerConfig values:
	auto.commit.interval.ms = 5000
	auto.offset.reset = earliest
	bootstrap.servers = [<kafka-host>:9092]
	check.crcs = true
	client.id = consumer-1
	connections.max.idle.ms = 540000
	enable.auto.commit = true
	exclude.internal.topics = true
	fetch.max.bytes = 52428800
	fetch.max.wait.ms = 500
	fetch.min.bytes = 1
	group.id = OryxGroup-ServingLayer-1545f926-8dd9-499d-aaff-1e3a709c0645
	heartbeat.interval.ms = 3000
	interceptor.classes = null
	key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
	max.partition.fetch.bytes = 16777216
	max.poll.interval.ms = 300000
	max.poll.records = 500
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.sample.window.ms = 30000
	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
	receive.buffer.bytes = 65536
	reconnect.backoff.ms = 50
	request.timeout.ms = 305000
	retry.backoff.ms = 100
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	send.buffer.bytes = 131072
	session.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
2017-04-19 10:45:54,297 INFO  ProducerConfig:180 ProducerConfig values:
	acks = 1
	batch.size = 16384
	block.on.buffer.full = false
	bootstrap.servers = [<kafka-host>:9092]
	buffer.memory = 33554432
	client.id = producer-1
	compression.type = gzip
	connections.max.idle.ms = 540000
	interceptor.classes = null
	key.serializer = class org.apache.kafka.common.serialization.StringSerializer
	linger.ms = 1000
	max.block.ms = 60000
	max.in.flight.requests.per.connection = 5
	max.request.size = 67108864
	metadata.fetch.timeout.ms = 60000
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 0
	retry.backoff.ms = 100
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	timeout.ms = 30000
	value.serializer = class org.apache.kafka.common.serialization.StringSerializer

When I run the ingest call, nothing seems to happen. I don't see any extra ouput on the servling layer or on the topics. 

 

avatar
Explorer

Correction to my last post:

The ProducerConfig only shows in the servling layer ouput once the ingest is invoked. 

 

I also see the following:

 

Apr 19, 2017 12:04:01 PM org.apache.catalina.util.SessionIdGeneratorBase createSecureRandom
INFO: Creation of SecureRandom instance for session ID generation using [SHA1PRNG] took [34,430] milliseconds.
2017-04-19 12:04:01,939 WARN  NetworkClient:568 Bootstrap broker <kafka-host>:9092 disconnected
2017-04-19 12:04:02,040 INFO  OryxApplication:65 Creating JAX-RS from endpoints in package(s) com.cloudera.oryx.app.serving,com.cloudera.oryx.app.serving.als,com.cloudera.oryx.lambda.serving
2017-04-19 12:04:02,148 WARN  NetworkClient:568 Bootstrap broker <kafka-host>:9092 disconnected
2017-04-19 12:04:02,354 WARN  NetworkClient:568 Bootstrap broker <kafka-host>:9092 disconnected
2017-04-19 12:04:02,413 INFO  Reflections:229 Reflections took 345 ms to scan 1 urls, producing 17 keys and 106 values
2017-04-19 12:04:02,572 WARN  NetworkClient:568 Bootstrap broker <kafka-host>:9092 disconnected
2017-04-19 12:04:02,681 INFO  Reflections:229 Reflections took 219 ms to scan 1 urls, producing 10 keys and 64 values
2017-04-19 12:04:02,781 WARN  NetworkClient:568 Bootstrap broker <kafka-host>:9092 disconnected
2017-04-19 12:04:02,831 INFO  Reflections:229 Reflections took 145 ms to scan 1 urls, producing 13 keys and 14 values
2017-04-19 12:04:02,998 WARN  NetworkClient:568 Bootstrap broker <kafka-host>:9092 disconnected
2017-04-19 12:04:03,216 WARN  NetworkClient:568 Bootstrap broker <kafka-host>:9092 disconnected
2017-04-19 12:04:03,439 WARN  NetworkClient:568 Bootstrap broker <kafka-host>:9092 disconnected
Apr 19, 2017 12:04:03 PM org.apache.coyote.AbstractProtocol start
INFO: Starting ProtocolHandler ["http-nio2-8080"]

avatar
Explorer

I solved this!

The issue was that my kafka/zookeeper hosts and my cluster were using kafka_2.11-0.10.0.0 

2.4.0-SNAPSHOT is using 0.10.1.1

 

I've updated to use kafka_2.11-0.10.1.1

avatar
Master Collaborator

Oh, now I see the same 'disconnected' problem you did.

It turns out that Kafka 0.10.0 and 0.10.1 are not protocol-compatible, which is quite disappointing.

So I think I'm going to have to back up and revert master/2.4 to Kafka 0.10.0, because that's the flavor that CDH is on and would like to avoid having two builds to support 0.10.0 vs 0.10.1. I hope that isn't a big deal to switch back in your prototype?

avatar
Explorer

No problem, I'll get around to testing that out on Monday. 

avatar
Explorer

Hi, I can confirm my setup is working with 0.10.0.0.

I noticed one issue in the serving layer output (I noticed this before I made the change, so it is not new).

2017-04-24 16:26:54,052 INFO  ALSServingModelManager:96 ALSServingModel[features:10, implicit:true, X:(877 users), Y:(1639 items, partitions: [0:1296, 1:343]...), fractionLoaded:1.0]
2017-04-24 16:26:54,053 INFO  SolverCache:78 Computing cached solver
2017-04-24 16:26:54,111 INFO  SolverCache:83 Computed new solver null


Shoud there be a null value in this last message output?

here's the code in question com.cloudera.oryx.app.als.SolverCache

          if (newYTYSolver != null) {
            log.info("Computed new solver {}", solver);
            solver.set(newYTYSolver);
          }