Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Best Approach - Operating on values of a Spark Pair RDD (discard key)

avatar
Super Collaborator

Hi All,

Need recommendation on the best approach for solving the below problem. I have included the code snippet that I have done.

I read a hdfs file using a custom input format and in turn get a PairRdd. Now I am interested in operating on the value one at a time and I am not bothered of the key.

Is a java list a scalable data structure to hold the values? Please have a look at the code below and suggest alternates. Also does the parallelize at the end of code give any benefit?

JavaPairRDD<LongWritable, BytesWritable> fixedFileRdd = getItSomeHow();
List<String>zeroValue = new ArrayList<String>();
Function2<List<String>, Tuple2<LongWritable, BytesWritable>, List<String>> seqOp = new Function2<List<String>, Tuple2<LongWritable,BytesWritable>, List<String>>() {
public List<String> call(List<String> valueList, Tuple2<LongWritable, BytesWritable> eachKeyValue) throws Exception {
valueList.add(doWhatever(new String(eachKeyValue._2.copyBytes())));
returnvalueList;
}
private String doWhatever(String string) {
// will be an external utility method call, this is for representational purpose only
return System.currentTimeMillis()+"-"+string;
}
};
Function2<List<String>, List<String>, List<String>> combOp = new Function2<List<String>, List<String>, List<String>>() {
public List<String> call(List<String> listOne, List<String> listTwo) throws Exception {
 listOne.addAll(listTwo);
 return listOne;
}
};
List<String> resultantList = fixedFileRdd.aggregate(zeroValue, seqOp , combOp );
JavaRDD<String> resultantRdd = jsc.parallelize(resultantList);
resultantRdd.saveAsTextFile("out-dir");
1 ACCEPTED SOLUTION

avatar
Super Collaborator

If your goal is to simply operate on the value one at a time, then you could use the "values" method of the PairRdd to get a plain Rdd on which you could use the map method. Something like this:

JavaPairRDD<LongWritable, BytesWritable> fixedFileRdd = getItSomeHow();
JavaRDD<String> resultantRdd = fixedFileRdd.values().map(
  new Function<BytesWritable, String>() {
    public Boolean call(BytesWritable i) {
      // do stuff
      return System.currentTimeMillis()+"-"+i.copyBytes();
  }});

View solution in original post

4 REPLIES 4

avatar
Super Collaborator

If your goal is to simply operate on the value one at a time, then you could use the "values" method of the PairRdd to get a plain Rdd on which you could use the map method. Something like this:

JavaPairRDD<LongWritable, BytesWritable> fixedFileRdd = getItSomeHow();
JavaRDD<String> resultantRdd = fixedFileRdd.values().map(
  new Function<BytesWritable, String>() {
    public Boolean call(BytesWritable i) {
      // do stuff
      return System.currentTimeMillis()+"-"+i.copyBytes();
  }});

avatar
Super Collaborator

Thanks @clukasik. That solves the problem. I was going an unwanted circle to address this. ++ on the second part of the question, does it make any sense in parallelizing a list before actually storing it to a file? As in the last 2 lines of my code.

avatar
Super Collaborator
@akeezhadat

- use parallelize when you have a collection in the local JVM (driver) that you want to split across the cluster. In your example, it would hurt to bring the RDD contents local (to driver JVM) and then push them back to the cluster (as a distributed data set)

avatar
Super Collaborator

Thanks @clukasik. Got it!!