Support Questions

Find answers, ask questions, and share your expertise

Java Read and Write Spark Vector's to Hdfs

avatar
Rising Star

I wrote Vector's (org.apache.spark.mllib.linalg.Vector) to the HDFS as the following

public void writePointsToFile(Path path, FileSystem fs, Configuration conf,
        List<Vector> points) throws IOException {

    SequenceFile.Writer writer = SequenceFile.createWriter(conf,
            Writer.file(path), Writer.keyClass(LongWritable.class),
            Writer.valueClass(Vector.class));

    long recNum = 0;

    for (Vector point : points) {
        writer.append(new LongWritable(recNum++), point);
    }
    writer.close();
}

( not sure that I used the right way to do that can't test it yet )

now I need to read this file as JavaRDD<Vector> because I want to use it in Spark Clustering K-mean but don't know how to do this.

1 ACCEPTED SOLUTION

avatar
Super Collaborator

Try storing the RDD to disk using saveAsObjectFile and you can retrieve back the same using objectFile()

vectorListRdd.saveAsObjectFile("<path>")
val fileRdd = sc.objectFile[Vector]("<path>")

View solution in original post

3 REPLIES 3

avatar
Super Collaborator

Try storing the RDD to disk using saveAsObjectFile and you can retrieve back the same using objectFile()

vectorListRdd.saveAsObjectFile("<path>")
val fileRdd = sc.objectFile[Vector]("<path>")

avatar
Rising Star

@Arun A K thank you for your answer , I have Vector not List of RDD

Second I am using Java

avatar
Super Collaborator

I thought you had an option to write the file to HDFS using a spark application. Hence assumed you had a list of Vector with which you can make an rdd calling the parallelize.

val sc = new SparkContext("local[1]","Sample App")
  val v1: Vector = Vectors.dense(2.0,3.0,4.0)
  val v2: Vector = Vectors.dense(5.0, 6.0, 7.0)
  val list = new ListBuffer[Vector]()
  list += v1
  list += v2
 val listRdd =  sc.parallelize(list)
 listRdd.saveAsObjectFile("localFile")
 // read it back to an RDD as vector in another application
val fileRdd = sc.objectFile[Vector]("localFile")
 

These methods are available in JavaSparkContext and JavaRDD