Reply
Explorer
Posts: 27
Registered: ‎01-06-2016

Re: Oryx2 Kafka Broker Issue

The batch and serving layers look good. I'm testing out the speed layer now.

 

Here's a question. As per the architectural description (http://oryx.io/index.html), I see that historical data is stored in HDFS by the batch layer. The speed and serving layer only seem to interact with the kafka topics for input and updates. 

 

So my question is, is it acceptable for the batch and speed layers to be actually running on separate hadoop clusters? They are configured to use the same kafka brokers. The reason I ask this is that AWS EMR clusters only allow adding "steps" that are run in sequential order. So my speed layer would never actually be launched on the batch layer cluster unless the batch layer spark job was killed or stopped. 

Also, does the serving layer interact with HDFS some way? I see that the hadoop dependencies are needed for the jar but I was under the impression in only interacted with the kafka topics. 


Cloudera Employee
Posts: 447
Registered: ‎08-11-2014

Re: Oryx2 Kafka Broker Issue

Yes, they're all only coupled by Kafka, so you could run these layers quite separately except that they need to share the brokers.

 

It probably won't fit EMR's model as both should run concurrently, and, should run continuously. I'm not sure if it can help you with a shared Kafka either.

 

Obviously it's also an option to run CDH on AWS if you want to try that.

 

Serving layer does not _generally_ use HDFS unless the model is so big that Kafka can't represent parts of it. Then it will write to HDFS and read from it. This really isn't great but it's the best I could do for now for really large models. This is something that could be improved at some point, I hope. If you tune Kafka to allow very large models you can get away without HDFS access.

Explorer
Posts: 27
Registered: ‎01-06-2016

Re: Oryx2 Kafka Broker Issue

I can confirm that the speed layer is working as expected now.

 

My current setup is that I have two separate EC2 instances and two seperate EMR clusters. One EC2 instance is running Kafka, while the other is running Zookeeper and an instance of the Serving layer. (This serving layer currently doesn't have any access to HDFS, so I will probably see some errors if the models get too large).

 

One EMR cluster is running the Batch layer and the other is running the Speed layer. This approach seems to working fine for now. My Batch layer is currently even writing the output to S3. I didn't expect for this to work straight out of the box, but it did. I just updated the oryx.conf and I guess Amazon's implementation of HDFS (EMRFS) takes care of the rest. 

 

hdfs-base = "s3://mybucket/Oryx"


Do you see any issues with this setup? (Apart from the Serving layer and HDFS access)

Cloudera Employee
Posts: 447
Registered: ‎08-11-2014

Re: Oryx2 Kafka Broker Issue

Although I haven't tested anything like that, it's just using really standard APIs in straightforward ways, so, I'm not surprised if S3 just works because HDFS can read/write S3 OK. I know there are some gotchas with actually using S3 as intermediate storage in Spark jobs, but I think your EMR jobs are using local HDFS for that.

Explorer
Posts: 27
Registered: ‎01-06-2016

Re: Oryx2 Kafka Broker Issue

I just spotted an issue with using S3. 

 

 

java.lang.IllegalArgumentException: Wrong FS: s3://mybucket/Oryx/data/oryx-1492697400000.data, expected: hdfs://<master-node>:8020
	at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:653)
	at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:194)
	at org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:106)
	at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305)
	at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
	at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1317)
	at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1430)
	at com.cloudera.oryx.lambda.batch.SaveToHDFSFunction.call(SaveToHDFSFunction.java:71)
	at com.cloudera.oryx.lambda.batch.SaveToHDFSFunction.call(SaveToHDFSFunction.java:35)


It's strange that it worked a few times and then crashed on this attempt. 

 

 

Anyway, following guidance from this post:
https://forums.aws.amazon.com/thread.jspa?threadID=30945

 

I updated com.cloudera.oryx.lambda.batch.SaveToHDFSFunction

from:

FileSystem fs = FileSystem.get(hadoopConf);

to

 

FileSystem fs = FileSystem.get(path.toUri(), hadoopConf);


This seems to have done the trick.

Cloudera Employee
Posts: 447
Registered: ‎08-11-2014

Re: Oryx2 Kafka Broker Issue

Yes, good catch. I'll track that at https://github.com/OryxProject/oryx/issues/329 and fix it in a few minutes.

Cloudera Employee
Posts: 447
Registered: ‎08-11-2014

Re: Oryx2 Kafka Broker Issue

Oh, now I see the same 'disconnected' problem you did.

It turns out that Kafka 0.10.0 and 0.10.1 are not protocol-compatible, which is quite disappointing.

So I think I'm going to have to back up and revert master/2.4 to Kafka 0.10.0, because that's the flavor that CDH is on and would like to avoid having two builds to support 0.10.0 vs 0.10.1. I hope that isn't a big deal to switch back in your prototype?

Explorer
Posts: 27
Registered: ‎01-06-2016

Re: Oryx2 Kafka Broker Issue

No problem, I'll get around to testing that out on Monday. 

Explorer
Posts: 27
Registered: ‎01-06-2016

Re: Oryx2 Kafka Broker Issue

Hi, I can confirm my setup is working with 0.10.0.0.

I noticed one issue in the serving layer output (I noticed this before I made the change, so it is not new).

2017-04-24 16:26:54,052 INFO  ALSServingModelManager:96 ALSServingModel[features:10, implicit:true, X:(877 users), Y:(1639 items, partitions: [0:1296, 1:343]...), fractionLoaded:1.0]
2017-04-24 16:26:54,053 INFO  SolverCache:78 Computing cached solver
2017-04-24 16:26:54,111 INFO  SolverCache:83 Computed new solver null


Shoud there be a null value in this last message output?

here's the code in question com.cloudera.oryx.app.als.SolverCache

          if (newYTYSolver != null) {
            log.info("Computed new solver {}", solver);
            solver.set(newYTYSolver);
          }



 

Highlighted
Cloudera Employee
Posts: 447
Registered: ‎08-11-2014

Re: Oryx2 Kafka Broker Issue

(BTW I went ahead and made a 2.4.0 release to have something official and probably-working out there. It worked on my CDH 5.11  + Spark 2.1 + Kafka 0.10.0 cluster.

 

Yes that's a minor problem in the log message. It should reference newYTYSolver. I'll fix that but it shouldn't otherwise affect anything.

Announcements