Created 01-06-2016 12:45 AM
I'm writing an iterative Spark application that needs to read a random line from hundreds of files and then aggregate the data in each iteration. The number of files is small: ~100, and each one is small in size: <1MB, but both will grow in the future. Each file has the exact same CSV schema and all of them live in the same directory in HDFS. In pseudo-code, the application would look like this:
for each trial in 1..1,000,000: val total = 0 for file in all files: val = open file and read random line total += val done return total done
I see the following possibilities to accomplish this in Spark:
What is the best approach?
Created 01-06-2016 09:24 AM
The rdd.sample approach should do this for you. Note that sample gets a fraction of the number of lines, so you may need to know line count. It will also return approximately one line based on your fraction, so it may make sense to over sample and then filter from a small result set to get exactly one. Alternatively use sampleExact, which may be a little slower, but will give you an exact count.
Given the multiple interactions, it may make sense to get all your samples in one rdd before interating. Not that it you sample withReplacement: true you will get the same effect as multiple runs of sample over the same set.
This approach is based on flattening your entire set of text files into an RDD[filename: String, line: String] and then using sampleByKey.
You can also probably do the repeat sampling in local code (scala here, but similar approach works in java / python) as opposed to pure spark:
import java.util.Random val seed = 1234 val sampleCount = 1000 val rand = new Random(seed); val files = sc.wholeTextFiles("*.txt") val filesAndLines = files.map(x=>(x._1,x._2.split("\n"))) val manySamples = filesAndLines.map(x=>(x._1, List.fill(sampleCount)(x._2(rand.nextInt(x._2.length)))))
Note here we are using Random, so your samples won't be strictly random, since Random.randInt is a normal random. For uniform random use different random generation.
If the files were larger you would be better off doing this job in a more Spark way:
val fractionsScaled = filesAndLines.map(x => (x._1, sampleCount/x._2.length.toDouble)).collect().toMap linesByFile.sampleByKeyExact(withReplacement = true, fractions = fractionsScaled)
Note here I'm producing fractions which are greater than 1 if there are fewer lines than the required number of iterations, but since we're using withReplacement, we are effectively simulating taking a single sample many times at 1/lines.
This also has the advantage that the Spark sampler is Poisson (withReplacement: true) so should be based on a more uniform randomness.
Created 01-06-2016 09:24 AM
The rdd.sample approach should do this for you. Note that sample gets a fraction of the number of lines, so you may need to know line count. It will also return approximately one line based on your fraction, so it may make sense to over sample and then filter from a small result set to get exactly one. Alternatively use sampleExact, which may be a little slower, but will give you an exact count.
Given the multiple interactions, it may make sense to get all your samples in one rdd before interating. Not that it you sample withReplacement: true you will get the same effect as multiple runs of sample over the same set.
This approach is based on flattening your entire set of text files into an RDD[filename: String, line: String] and then using sampleByKey.
You can also probably do the repeat sampling in local code (scala here, but similar approach works in java / python) as opposed to pure spark:
import java.util.Random val seed = 1234 val sampleCount = 1000 val rand = new Random(seed); val files = sc.wholeTextFiles("*.txt") val filesAndLines = files.map(x=>(x._1,x._2.split("\n"))) val manySamples = filesAndLines.map(x=>(x._1, List.fill(sampleCount)(x._2(rand.nextInt(x._2.length)))))
Note here we are using Random, so your samples won't be strictly random, since Random.randInt is a normal random. For uniform random use different random generation.
If the files were larger you would be better off doing this job in a more Spark way:
val fractionsScaled = filesAndLines.map(x => (x._1, sampleCount/x._2.length.toDouble)).collect().toMap linesByFile.sampleByKeyExact(withReplacement = true, fractions = fractionsScaled)
Note here I'm producing fractions which are greater than 1 if there are fewer lines than the required number of iterations, but since we're using withReplacement, we are effectively simulating taking a single sample many times at 1/lines.
This also has the advantage that the Spark sampler is Poisson (withReplacement: true) so should be based on a more uniform randomness.
Created 01-07-2016 11:00 PM
I think you are suggesting option #4; would you provide more detail of how that might work? If I understand the suggestion correctly, this would be the pseudo-code:
JavaPairRDD<String, Float> allSamples; for (fileName in fileNames) { JavaRDD<String> file = sc.textFile(fileName); JavaRDD<String> sample = file.sample(true, 1,000,000 / file.count()); JavaPairRDD<String, Float> fileToSample = sample.mapToPair(x -> { Float importantElement = /* extract from line */ return new Tuple2<>(fileName , importantElement); }); allSamples.union(fileToSample); }
At the end of this the allSamples RDD will be a 2D matrix with the rows representing each file (~100 rows) and the columns representing each iteration (~1M columns).
How do I perform aggregation to sum all elements in each column?
Created 01-08-2016 03:41 PM
More example included in original answer, showing a more efficient method. Aggregation per column probably really a different question. Feel free to ask it if you still need!