Support Questions
Find answers, ask questions, and share your expertise

What is the best way to read a random line from hundreds of files millions of times in Spark?

I'm writing an iterative Spark application that needs to read a random line from hundreds of files and then aggregate the data in each iteration. The number of files is small: ~100, and each one is small in size: <1MB, but both will grow in the future. Each file has the exact same CSV schema and all of them live in the same directory in HDFS. In pseudo-code, the application would look like this:

for each trial in 1..1,000,000:
    val total = 0
    for file in all files:
        val = open file and read random line
        total += val
    done
    return total
done

I see the following possibilities to accomplish this in Spark:

  1. Execute ~1M iterations and in each one open ~100 files, read one line, and perform aggregation (the approach in pseudo-code above). This is simple, but very I/O intensive because there will be 1M * 100 calls to open a file in HDFS.
  2. Place the contents of each of the ~100 files into memory in the driver program and then broadcast that to each of the ~1M iterations. Each iteration would read a random line from ~100 in-memory objects, then aggregate the results. This is better, but each object has to be serialized and transferred over the network from the driver program.
  3. Create an external Hive table and in each iteration execute select queries to fetch a random row, then aggregate the results.
  4. Execute ~100 iterations and in each one open a single file and read ~1M lines from it at random. Each iteration would return an list of values ~1M long and all of the aggregation would be performed in the driver program.

What is the best approach?

1 ACCEPTED SOLUTION

Accepted Solutions

Guru

The rdd.sample approach should do this for you. Note that sample gets a fraction of the number of lines, so you may need to know line count. It will also return approximately one line based on your fraction, so it may make sense to over sample and then filter from a small result set to get exactly one. Alternatively use sampleExact, which may be a little slower, but will give you an exact count.

Given the multiple interactions, it may make sense to get all your samples in one rdd before interating. Not that it you sample withReplacement: true you will get the same effect as multiple runs of sample over the same set.

This approach is based on flattening your entire set of text files into an RDD[filename: String, line: String] and then using sampleByKey.

You can also probably do the repeat sampling in local code (scala here, but similar approach works in java / python) as opposed to pure spark:

import java.util.Random
val seed = 1234
val sampleCount = 1000

val rand = new Random(seed);
val files = sc.wholeTextFiles("*.txt")
val filesAndLines = files.map(x=>(x._1,x._2.split("\n")))
val manySamples = filesAndLines.map(x=>(x._1, List.fill(sampleCount)(x._2(rand.nextInt(x._2.length)))))

Note here we are using Random, so your samples won't be strictly random, since Random.randInt is a normal random. For uniform random use different random generation.

If the files were larger you would be better off doing this job in a more Spark way:

val fractionsScaled = filesAndLines.map(x => (x._1, sampleCount/x._2.length.toDouble)).collect().toMap
linesByFile.sampleByKeyExact(withReplacement = true, fractions = fractionsScaled)

Note here I'm producing fractions which are greater than 1 if there are fewer lines than the required number of iterations, but since we're using withReplacement, we are effectively simulating taking a single sample many times at 1/lines.

This also has the advantage that the Spark sampler is Poisson (withReplacement: true) so should be based on a more uniform randomness.

View solution in original post

3 REPLIES 3

Guru

The rdd.sample approach should do this for you. Note that sample gets a fraction of the number of lines, so you may need to know line count. It will also return approximately one line based on your fraction, so it may make sense to over sample and then filter from a small result set to get exactly one. Alternatively use sampleExact, which may be a little slower, but will give you an exact count.

Given the multiple interactions, it may make sense to get all your samples in one rdd before interating. Not that it you sample withReplacement: true you will get the same effect as multiple runs of sample over the same set.

This approach is based on flattening your entire set of text files into an RDD[filename: String, line: String] and then using sampleByKey.

You can also probably do the repeat sampling in local code (scala here, but similar approach works in java / python) as opposed to pure spark:

import java.util.Random
val seed = 1234
val sampleCount = 1000

val rand = new Random(seed);
val files = sc.wholeTextFiles("*.txt")
val filesAndLines = files.map(x=>(x._1,x._2.split("\n")))
val manySamples = filesAndLines.map(x=>(x._1, List.fill(sampleCount)(x._2(rand.nextInt(x._2.length)))))

Note here we are using Random, so your samples won't be strictly random, since Random.randInt is a normal random. For uniform random use different random generation.

If the files were larger you would be better off doing this job in a more Spark way:

val fractionsScaled = filesAndLines.map(x => (x._1, sampleCount/x._2.length.toDouble)).collect().toMap
linesByFile.sampleByKeyExact(withReplacement = true, fractions = fractionsScaled)

Note here I'm producing fractions which are greater than 1 if there are fewer lines than the required number of iterations, but since we're using withReplacement, we are effectively simulating taking a single sample many times at 1/lines.

This also has the advantage that the Spark sampler is Poisson (withReplacement: true) so should be based on a more uniform randomness.

View solution in original post

@Simon Elliston Ball

I think you are suggesting option #4; would you provide more detail of how that might work? If I understand the suggestion correctly, this would be the pseudo-code:

JavaPairRDD<String, Float> allSamples;
for (fileName in fileNames) {
    JavaRDD<String> file = sc.textFile(fileName);
    JavaRDD<String> sample = file.sample(true, 1,000,000 / file.count());
    JavaPairRDD<String, Float> fileToSample = sample.mapToPair(x -> {
        Float importantElement = /* extract from line */
        return new Tuple2<>(fileName , importantElement);
    });
    allSamples.union(fileToSample);
}

At the end of this the allSamples RDD will be a 2D matrix with the rows representing each file (~100 rows) and the columns representing each iteration (~1M columns).

How do I perform aggregation to sum all elements in each column?

Guru

More example included in original answer, showing a more efficient method. Aggregation per column probably really a different question. Feel free to ask it if you still need!