I mentioned above the reason for the 'disconnected' message. It is resolved now. There was no more detail but I found this similar issue which had me look into the kafka versions:
OK, is it largely working then? If it looks like the app is running, then I'll move to test 2.4 on my cluster too and if it looks good, go ahead and cut a release.
The batch and serving layers look good. I'm testing out the speed layer now.
Here's a question. As per the architectural description (http://oryx.io/index.html), I see that historical data is stored in HDFS by the batch layer. The speed and serving layer only seem to interact with the kafka topics for input and updates.
So my question is, is it acceptable for the batch and speed layers to be actually running on separate hadoop clusters? They are configured to use the same kafka brokers. The reason I ask this is that AWS EMR clusters only allow adding "steps" that are run in sequential order. So my speed layer would never actually be launched on the batch layer cluster unless the batch layer spark job was killed or stopped.
Also, does the serving layer interact with HDFS some way? I see that the hadoop dependencies are needed for the jar but I was under the impression in only interacted with the kafka topics.
Yes, they're all only coupled by Kafka, so you could run these layers quite separately except that they need to share the brokers.
It probably won't fit EMR's model as both should run concurrently, and, should run continuously. I'm not sure if it can help you with a shared Kafka either.
Obviously it's also an option to run CDH on AWS if you want to try that.
Serving layer does not _generally_ use HDFS unless the model is so big that Kafka can't represent parts of it. Then it will write to HDFS and read from it. This really isn't great but it's the best I could do for now for really large models. This is something that could be improved at some point, I hope. If you tune Kafka to allow very large models you can get away without HDFS access.
I can confirm that the speed layer is working as expected now.
My current setup is that I have two separate EC2 instances and two seperate EMR clusters. One EC2 instance is running Kafka, while the other is running Zookeeper and an instance of the Serving layer. (This serving layer currently doesn't have any access to HDFS, so I will probably see some errors if the models get too large).
One EMR cluster is running the Batch layer and the other is running the Speed layer. This approach seems to working fine for now. My Batch layer is currently even writing the output to S3. I didn't expect for this to work straight out of the box, but it did. I just updated the oryx.conf and I guess Amazon's implementation of HDFS (EMRFS) takes care of the rest.
hdfs-base = "s3://mybucket/Oryx"
Do you see any issues with this setup? (Apart from the Serving layer and HDFS access)
Although I haven't tested anything like that, it's just using really standard APIs in straightforward ways, so, I'm not surprised if S3 just works because HDFS can read/write S3 OK. I know there are some gotchas with actually using S3 as intermediate storage in Spark jobs, but I think your EMR jobs are using local HDFS for that.
I just spotted an issue with using S3.
java.lang.IllegalArgumentException: Wrong FS: s3://mybucket/Oryx/data/oryx-1492697400000.data, expected: hdfs://<master-node>:8020 at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:653) at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:194) at org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:106) at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305) at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1317) at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1430) at com.cloudera.oryx.lambda.batch.SaveToHDFSFunction.call(SaveToHDFSFunction.java:71) at com.cloudera.oryx.lambda.batch.SaveToHDFSFunction.call(SaveToHDFSFunction.java:35)
It's strange that it worked a few times and then crashed on this attempt.
Anyway, following guidance from this post:
I updated com.cloudera.oryx.lambda.batch.SaveToHDFSFunction
FileSystem fs = FileSystem.get(hadoopConf);
FileSystem fs = FileSystem.get(path.toUri(), hadoopConf);
This seems to have done the trick.