Created 06-06-2016 01:21 PM
Hi All,
Need recommendation on the best approach for solving the below problem. I have included the code snippet that I have done.
I read a hdfs file using a custom input format and in turn get a PairRdd. Now I am interested in operating on the value one at a time and I am not bothered of the key.
Is a java list a scalable data structure to hold the values? Please have a look at the code below and suggest alternates. Also does the parallelize at the end of code give any benefit?
JavaPairRDD<LongWritable, BytesWritable> fixedFileRdd = getItSomeHow(); List<String>zeroValue = new ArrayList<String>(); Function2<List<String>, Tuple2<LongWritable, BytesWritable>, List<String>> seqOp = new Function2<List<String>, Tuple2<LongWritable,BytesWritable>, List<String>>() { public List<String> call(List<String> valueList, Tuple2<LongWritable, BytesWritable> eachKeyValue) throws Exception { valueList.add(doWhatever(new String(eachKeyValue._2.copyBytes()))); returnvalueList; } private String doWhatever(String string) { // will be an external utility method call, this is for representational purpose only return System.currentTimeMillis()+"-"+string; } }; Function2<List<String>, List<String>, List<String>> combOp = new Function2<List<String>, List<String>, List<String>>() { public List<String> call(List<String> listOne, List<String> listTwo) throws Exception { listOne.addAll(listTwo); return listOne; } }; List<String> resultantList = fixedFileRdd.aggregate(zeroValue, seqOp , combOp ); JavaRDD<String> resultantRdd = jsc.parallelize(resultantList); resultantRdd.saveAsTextFile("out-dir");
Created 06-06-2016 02:09 PM
If your goal is to simply operate on the value one at a time, then you could use the "values" method of the PairRdd to get a plain Rdd on which you could use the map method. Something like this:
JavaPairRDD<LongWritable, BytesWritable> fixedFileRdd = getItSomeHow(); JavaRDD<String> resultantRdd = fixedFileRdd.values().map( new Function<BytesWritable, String>() { public Boolean call(BytesWritable i) { // do stuff return System.currentTimeMillis()+"-"+i.copyBytes(); }});
Created 06-06-2016 02:09 PM
If your goal is to simply operate on the value one at a time, then you could use the "values" method of the PairRdd to get a plain Rdd on which you could use the map method. Something like this:
JavaPairRDD<LongWritable, BytesWritable> fixedFileRdd = getItSomeHow(); JavaRDD<String> resultantRdd = fixedFileRdd.values().map( new Function<BytesWritable, String>() { public Boolean call(BytesWritable i) { // do stuff return System.currentTimeMillis()+"-"+i.copyBytes(); }});
Created 06-06-2016 02:16 PM
Thanks @clukasik. That solves the problem. I was going an unwanted circle to address this. ++ on the second part of the question, does it make any sense in parallelizing a list before actually storing it to a file? As in the last 2 lines of my code.
Created 06-06-2016 02:25 PM
- use parallelize when you have a collection in the local JVM (driver) that you want to split across the cluster. In your example, it would hurt to bring the RDD contents local (to driver JVM) and then push them back to the cluster (as a distributed data set)
Created 06-06-2016 02:28 PM
Thanks @clukasik. Got it!!