Support Questions

Find answers, ask questions, and share your expertise

Oryx2 Kafka Broker Issue

avatar
Explorer

Hi, I'm attempting to get the ALS example in Oryx2 up and running using an AWS EMR cluster. 

I've been using the requirements matrix here: http://oryx.io/docs/admin.html

(Aside: I see Oryx 2.4.x in the matrix but don't see a 2.4.x release here? https://github.com/OryxProject/oryx/releases)

 

My Kafka/Zookeeper hosts are using the following versions:

Oryx 2.3.0
kafka_2.11-0.9.0.1

spark-2.0.1-bin-hadoop2.7

 

I can run 

 

./oryx-run.sh kafka-setup

 and create the topics successfully. 

 

 

I can also run 

 

./oryx-run.sh kafka-input --input-file data.csv

and see the messages being added to the topic.

My cluster is using the following versions:
Hadoop 2.7.3
Spark 2.0.1

 

In addition, I've also included the following jars on the cluster (master and slave nodes):

kafka_2.11-0.9.0.1/libs/*

spark-streaming-kafka-0-8_2.11-2.0.1.jar

spark-core_2.11-2.0.1.jar

 

According to here:
https://spark.apache.org/docs/2.0.1/streaming-kafka-integration.html
spark-streaming-kafka-0-8 is suitable for broker version "0.8.2.1 or higher". 

 

I have adapted the spark-submit launch for the Batch layer into an EMR "step". The only difference I've had to make is to change "deploy-mode" from client to cluster. 

 

aws emr add-steps --cluster-id XXX --steps Type=spark,Name=OryxBatchLayer-ALSExample,Args=[--master,yarn,--deploy-mode,cluster,--name,OryxBatchLayer-ALSExample,--class,com.cloudera.oryx.batch.Main,--files,s3://mybucket/oryx.conf,--driver-memory,1g,--driver-java-options,"-Dconfig.file=oryx.conf",--executor-memory,4g,--executor-cores,8,--conf,spark.executor.extraJavaOptions="-Dconfig.file=oryx.conf",--conf,spark.ui.port=4040,--conf,spark.io.compression.codec=lzf,--conf,spark.logConf=true,--conf,spark.serializer=org.apache.spark.serializer.KryoSerializer,--conf,spark.speculation=true,--conf,spark.ui.showConsoleProgress=false,--num-executors=4,s3://mybucket/oryx-batch-2.3.0.jar],ActionOnFailure=CONTINUE

 

I then see the following error in the container:

17/04/18 14:15:00 ERROR JobScheduler: Error generating jobs for time 1492524900000 ms
java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker
	at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6$$anonfun$apply$7.apply(KafkaCluster.scala:97)
	at scala.Option.map(Option.scala:146)
	at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:97)
	at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:94)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)



Apparently this is caused by a mismatch with Kafka/Spark versions, but from what I can see I have followed the recommendations. Any ideas?

 

 

 

 

 

 

 

 

 

 

 

 

 

 

1 ACCEPTED SOLUTION

avatar
Master Collaborator

First, I'll tell you that this is Quite Complicated and confuses me too. Matching Spark and Kafka versions is tricky, exacerabated by multiple and incompatible Kafka APIs, multiplied by slight differences in which versions are shipped in what CDH package.

 

(Yes there is no 2.4 yet, I put it in there as a 'preview'.)

 

I recall that I am actually not able to get the 2.3 release to pass tests with upstream components and that's why it builds with the CDH profile enabled by default. I wanted to move on to Kafka 0.9 to enable security support. But this is supported by Spark 2.x's kafka-0_10 integration component. And that wasn't yet available for CDH because it didn't work with the CDH Kafka 0.9. But the kafka-0_8 component did work. But then that component didn't work when enable with standard Spark 2's distro. This is a nightmarish no-mans-land of version combos.

 

However master (2.4) should be clearer since it moves on to Kafka 0.10. It does work, or at least passes tests, vs Spark 2.1 and Kafka 0.10. In fact, I have a to-do to update the CDH dependencies too to get it working in master.

 

So: if you're making your own build for non-CDH components, can you try building 2.4 SNAPSHOT from master? If that's working I can hurry up getting the CDH part updated so we can cut a 2.4.0 release.

View solution in original post

27 REPLIES 27

avatar
Explorer

I mentioned above the reason for the 'disconnected' message. It is resolved now. There was no more detail but I found this similar issue which had me look into the kafka versions:

http://stackoverflow.com/questions/42851834/apache-kafka-producer-networkclient-broker-server-discon...

avatar
Master Collaborator

OK, is it largely working then? If it looks like the app is running, then I'll move to test 2.4 on my cluster too and if it looks good, go ahead and cut a release.

avatar
Explorer

The batch and serving layers look good. I'm testing out the speed layer now.

 

Here's a question. As per the architectural description (http://oryx.io/index.html), I see that historical data is stored in HDFS by the batch layer. The speed and serving layer only seem to interact with the kafka topics for input and updates. 

 

So my question is, is it acceptable for the batch and speed layers to be actually running on separate hadoop clusters? They are configured to use the same kafka brokers. The reason I ask this is that AWS EMR clusters only allow adding "steps" that are run in sequential order. So my speed layer would never actually be launched on the batch layer cluster unless the batch layer spark job was killed or stopped. 

Also, does the serving layer interact with HDFS some way? I see that the hadoop dependencies are needed for the jar but I was under the impression in only interacted with the kafka topics. 


avatar
Master Collaborator

Yes, they're all only coupled by Kafka, so you could run these layers quite separately except that they need to share the brokers.

 

It probably won't fit EMR's model as both should run concurrently, and, should run continuously. I'm not sure if it can help you with a shared Kafka either.

 

Obviously it's also an option to run CDH on AWS if you want to try that.

 

Serving layer does not _generally_ use HDFS unless the model is so big that Kafka can't represent parts of it. Then it will write to HDFS and read from it. This really isn't great but it's the best I could do for now for really large models. This is something that could be improved at some point, I hope. If you tune Kafka to allow very large models you can get away without HDFS access.

avatar
Explorer

I can confirm that the speed layer is working as expected now.

 

My current setup is that I have two separate EC2 instances and two seperate EMR clusters. One EC2 instance is running Kafka, while the other is running Zookeeper and an instance of the Serving layer. (This serving layer currently doesn't have any access to HDFS, so I will probably see some errors if the models get too large).

 

One EMR cluster is running the Batch layer and the other is running the Speed layer. This approach seems to working fine for now. My Batch layer is currently even writing the output to S3. I didn't expect for this to work straight out of the box, but it did. I just updated the oryx.conf and I guess Amazon's implementation of HDFS (EMRFS) takes care of the rest. 

 

hdfs-base = "s3://mybucket/Oryx"


Do you see any issues with this setup? (Apart from the Serving layer and HDFS access)

avatar
Master Collaborator

Although I haven't tested anything like that, it's just using really standard APIs in straightforward ways, so, I'm not surprised if S3 just works because HDFS can read/write S3 OK. I know there are some gotchas with actually using S3 as intermediate storage in Spark jobs, but I think your EMR jobs are using local HDFS for that.

avatar
Explorer

I just spotted an issue with using S3. 

 

 

java.lang.IllegalArgumentException: Wrong FS: s3://mybucket/Oryx/data/oryx-1492697400000.data, expected: hdfs://<master-node>:8020
	at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:653)
	at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:194)
	at org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:106)
	at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305)
	at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
	at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1317)
	at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1430)
	at com.cloudera.oryx.lambda.batch.SaveToHDFSFunction.call(SaveToHDFSFunction.java:71)
	at com.cloudera.oryx.lambda.batch.SaveToHDFSFunction.call(SaveToHDFSFunction.java:35)


It's strange that it worked a few times and then crashed on this attempt. 

 

 

Anyway, following guidance from this post:
https://forums.aws.amazon.com/thread.jspa?threadID=30945

 

I updated com.cloudera.oryx.lambda.batch.SaveToHDFSFunction

from:

FileSystem fs = FileSystem.get(hadoopConf);

to

 

FileSystem fs = FileSystem.get(path.toUri(), hadoopConf);


This seems to have done the trick.

avatar
Master Collaborator

Yes, good catch. I'll track that at https://github.com/OryxProject/oryx/issues/329 and fix it in a few minutes.