Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Oryx ALS running with Hadoop

avatar
Explorer

Sean,

 

We are running Oryx with Hadoop.

It is running to converge around iteration 13.

However, same dataset with same training parameters are running about 120-130 to converge in a single local VM

(that's not running with Hadoop).

 

This seems not make sense. I am thinking the iteration# does not depend on the platform (Hadoop or local one VM computation).

The iteration# is related to training parameter, threshold and initial value of Y.

In other words, I am expecting to see similar iteration# from Hadoop and single local VM.

 

When running in Hadoop, I noticed the following log message. It looks the convergence is in low iteration because no sample and it uses

"artificial convergence". I did not see the similar message in single local VM (it shows something like "Avg absolute difference in estimate vs prior iteration over 18163 samples: 0.20480296387324523"). So, I think this maybe the issue.

Any suggestion or thought why this happens ?

 

Tue Jun 09 22:14:38 PDT 2015 INFO No samples for convergence; using artificial convergence value: 6.103515625E-5
Tue Jun 09 22:14:38 PDT 2015 INFO Converged

 

Thanks.

 

Jason

1 ACCEPTED SOLUTION

avatar
Master Collaborator

Ah OK, I think I understand this now. I made two little mistakes here. First was overlooking that you have actually a small number of items -- about a thousand right? which matches the number of records into the sampling function. And on re-reading the code I see that the job is invoked over _items_, so really the loop is over items and then users, despite the names in the code. That is why there is so little input to this job -- they're item IDs.

 

So, choosing the sampling rate based on the number of reducers is a little problematic, but reasonable. However, the number of reducers you have may be suitable for the size of your users, but not the size of the items, which may be very different. This is a bit of a deeper suboptimality, since in your case your jobs have very different numbers of users and items. Normally it just means your item jobs in the iterations have more reducers than necessary, which is just a little extra overhead. But it has also manifested here as an actual problem for the way this convergence heuristic works.

 

One option is to let the user override the sampling rate, but it seems like something the user shouldn't have to set.

Another option is to expose control over the number of reducers for the user- and item-related jobs separately. That might be a good idea for reasons above, although it's a slightly unrelated issue.

 

More directly, I'm going to look at ways to efficiently count the number of users and items and choose a sampling rate accordingly. If it's too low, nothing is sampled; if it's too high, the sampling takes a long time. I had hoped to avoid another job to just do this counting, but maybe there is an efficient way to figure it out.

 

Let me do some homework.

View solution in original post

27 REPLIES 27

avatar
Master Collaborator

OK, then it was a reasonable fix but it actually would not have affected you anyway given that your IDs are strings.

 

I can't see why it wouldn't sample any of the IDs. Their string hashCode ought to be fairly well distributed, so you should get reasonably close to the desired fraction of IDs sampled. You see the "Yconvergence" dir, so the right jobs are running, but there's no output (just _SUCCESS), which suggests that everything is working except not outputting IDs.

 

I'd like to know what happens on these IDs inside ConvergenceSampleFn, but I know you can't share the IDs. I wonder if it's possible to run just that snippet of code on a bunch of IDs to understand what they hash to? or to toss in a few logging statements and re-run on your end to see what happens? 

 

@Override
public void process(Pair<Long, float[]> input, Emitter<String> emitter) {
String userIDString = input.first().toString();
if (userIDString.hashCode() % convergenceSamplingModulus == 0) {
float[] xu = input.second();
for (LongObjectMap.MapEntry<float[]> entry : yState.getY().entrySet()) {
long itemID = entry.getKey();
if (Long.toString(itemID).hashCode() % convergenceSamplingModulus == 0) {
float estimate = (float) SimpleVectorMath.dot(xu, entry.getValue());
emitter.emit(DelimitedDataUtils.encode(',', userIDString, itemID, estimate));
}
}
}
}

avatar
Explorer

Sean,

 

Thanks for the follow up.

Yes, I can try that. Can you insert the appropriate log.info into the codes you want me to try. So, it can log proper info for you to review.

 

Meanwhile, I did try to reduce the reducer# (from 30 to 10) and I noticed it did sample to calculate converge distance. I checked the code and

it looks reducer# is used to generate the modular number.

 

For example:

Avg absolute difference in estimate vs prior iteration over 2124 samples: 0.02002799961913492

 

Jason

avatar
Master Collaborator

That's good that it works at a different value, but I can't figure out why that would be. Obviously it has something to do with the IDs. The two extra log statements in ConvergenceSampleFn will print all of their hash codes:

 

@Override
public void process(Pair<Long, float[]> input, Emitter<String> emitter) {
String userIDString = input.first().toString();
log.info(Integer.toString(userIDString.hashCode()));
if (userIDString.hashCode() % convergenceSamplingModulus == 0) {
float[] xu = input.second();
for (LongObjectMap.MapEntry<float[]> entry : yState.getY().entrySet()) {
long itemID = entry.getKey();
log.info(Integer.toString(Long.toString(itemID).hashCode()));
if (Long.toString(itemID).hashCode() % convergenceSamplingModulus == 0) {
float estimate = (float) SimpleVectorMath.dot(xu, entry.getValue());
emitter.emit(DelimitedDataUtils.encode(',', userIDString, itemID, estimate));
}
}
}
}

avatar
Explorer

Sean,

 

(1)

Here includes some results:

 

INFO [main] com.cloudera.oryx.als.computation.iterate.row.ConvergenceSampleFn: userIDString= 2111185186541130611 hashCode= 977794330

INFO [main] com.cloudera.oryx.als.computation.iterate.row.ConvergenceSampleFn: userIDString= 3174317317673160368 hashCode= 463078209
INFO [main] com.cloudera.oryx.als.computation.iterate.row.ConvergenceSampleFn: userIDString= 3174428972624599832 hashCode= 1617905253
INFO [main] com.cloudera.oryx.als.computation.iterate.row.ConvergenceSampleFn: userIDString= 3444764202548713566 hashCode= 1628781813
INFO [main] com.cloudera.oryx.als.computation.iterate.row.ConvergenceSampleFn: userIDString= 3653094543606133455 hashCode= 1773010709

(2) In the Hadoop log, I do not see any log info about the following. Based on this, it seems no ID passes

"if (userIDString.hashCode() % convergenceSamplingModulus == 0) " check...

 

log.info(Integer.toString(Long.toString(itemID).hashCode()));

(3) Can you in overall explain how the sampling is working  ?

(a) Is it sampling in each reducer of each iteration ?

(b) When it samples, is it looping into all Long User IDs and Long Item IDs and then apply mod ? I saw you use hashCode in new code.

Oryx 1.0.1 uses Long IDs for mod...

(c) int modulus = RandomUtils.nextTwinPrime(4 * opts.getNumReducers() * opts.getNumReducers());

Why you choose modulus in this way ?

 

Thanks.

avatar
Master Collaborator

Yes, that's strange, since we should see about 1/3673 IDs pass this check. Here's a quick demo of the same idea from some Scala one-liners:

 

val r = new scala.util.Random

(0 to 10000000).par.count(x => r.nextLong.toString.hashCode % 3673 == 0)

 

2854

 

10000000/2854

 

3503

 

 

About 3503 are expected and we get 2854. The idea ought to be sound. How much input do you have -- how many user IDs? it's a reasonably large number right?

Sampling is simply relying on uniformity of the distribution of the hash code, which is fine.

 

Yes, the problem was that IDs are not uniform sometimes, but the hashCode should always fix that.

 

Yes, sampling is per iteration and samples the same IDs each time.

 

The sampling size is chosen to try to scale up with the input size but it doesn't know the input size, so it's proxied by the number of reducers. This is an empirically determined formula.

avatar
Explorer

hm... that's strange why no IDs passed.

 

We have 7.6 million user IDs...

 

Question on this "...Yes, sampling is per iteration and samples the same IDs each time...."

 

Give an example, there are 30 reducers

say, in iteration 3,

(1) In iteration 3 and reducer #1

It loops all the users IDs (and item IDs) inside this reducer #1

(2) In iteration 3 and reducer #2

It loops all the users IDs (and item IDs) inside this reducer #2

 

 

Then, after iteration 3, it saves the sampling IDs. Same sample IDs are then use in iteration #4 and

it compares the difference between the estimated values of this samples ?

 

 

 

 

 

avatar
Master Collaborator

Yes, because the sampling rule is deterministic, the same IDs are sampled each time.

 

I'm fairly stumped by this one, as I can't make out why your user IDs would never get sampled. Clearly it's something to do with the modulus since different smaller values work. But it makes little sense unless your ID's hash values weren't uniform, but they are hashed as strings.

 

Is it possible to compute the hash code of the string representation of all of your IDs and see how many are 0 mod 3673? at least that would rule in or out some basic things.

avatar
Explorer

Sean,

 

Yes, we tried that..

We took the long IDs of the 7.5 million users (yes, the long ID is the one that Oryx generates by hashing) and about 2021 of them are 

0 mod 3673..  So it looks right. It's odd it's not passing in Oryx. We have about 1200 items and the long ID mod 3673 gives us nothing

(no item long ID in 0 mod 3673)...

 

Some questions to follow.

(1) The sampling process is separate for user IDs and item IDs. Right?

(2) In my previous example, I use iteration #3 and #4 as example. On 2nd thought, I am thinking the sampling processing should

happen BEFORE the iteration 1 starts. Right ? I notice there are several "data pre-processing" step (e.g., MergeIDMappingStep). I am thinking

the sampling happened there (MergeIDMappingStep) and then the same sample IDs used across each iteration. So, I am confused that

the "hashcode log message" I provided is in each reducer of each iteration. Can you explain a little bit ?

 

Thanks.

 

avatar
Master Collaborator

Sampling happens on every iteration. It has to record the current estimates for the same sampled users/items, and those change on each iteration. On the second iteration it's possible to compare the current vs previous sample estimates to assess convergence. Yes the sampling function is the same for both users and items; it's all in that function above. The next thing I'd check are statistics from the MapReduce job that runs for ConvergenceSampleFn. How many records went into the reducer and came out? I assume 0 were emitted, but I'm wondering if somehow it's running on just a small set of the data. That would at least explain it but I don't know if that's the case. You should see about 7.5M records into the reducer, I believe.

avatar
Explorer

Sean,

 

Can you explain a little bit where I can identify such info ?

 

I check one particular job status (a Y job named "....0-3-Y-RowStep...") from Hadoop UI... This is a job that uses 30 reducers and

failed to sampling..

I saw the "Map-Reduce Framework" counter information,

there are

(1) combine input records: all zeros in our case

(2) combine output records: all zeros in our case

(3) Map input records: 1190

(4) Map output records: 1190

(5) Reduce input records: 1190

(6) Reduce output records: 0

 

(A) Where else I should check ?

(B) I noticed that "Reduce output records=0", it looks not normal.

However, I also checked the job that uses 10 reducers and fine to sampling..It also with "Reduce output records=0". thought ?

 

Thanks.