Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Java Read and Write Spark Vector's to Hdfs

avatar
Rising Star

I wrote Vector's (org.apache.spark.mllib.linalg.Vector) to the HDFS as the following

public void writePointsToFile(Path path, FileSystem fs, Configuration conf,
        List<Vector> points) throws IOException {

    SequenceFile.Writer writer = SequenceFile.createWriter(conf,
            Writer.file(path), Writer.keyClass(LongWritable.class),
            Writer.valueClass(Vector.class));

    long recNum = 0;

    for (Vector point : points) {
        writer.append(new LongWritable(recNum++), point);
    }
    writer.close();
}

( not sure that I used the right way to do that can't test it yet )

now I need to read this file as JavaRDD<Vector> because I want to use it in Spark Clustering K-mean but don't know how to do this.

1 ACCEPTED SOLUTION

avatar
Super Collaborator

Try storing the RDD to disk using saveAsObjectFile and you can retrieve back the same using objectFile()

vectorListRdd.saveAsObjectFile("<path>")
val fileRdd = sc.objectFile[Vector]("<path>")

View solution in original post

3 REPLIES 3

avatar
Super Collaborator

Try storing the RDD to disk using saveAsObjectFile and you can retrieve back the same using objectFile()

vectorListRdd.saveAsObjectFile("<path>")
val fileRdd = sc.objectFile[Vector]("<path>")

avatar
Rising Star

@Arun A K thank you for your answer , I have Vector not List of RDD

Second I am using Java

avatar
Super Collaborator

I thought you had an option to write the file to HDFS using a spark application. Hence assumed you had a list of Vector with which you can make an rdd calling the parallelize.

val sc = new SparkContext("local[1]","Sample App")
  val v1: Vector = Vectors.dense(2.0,3.0,4.0)
  val v2: Vector = Vectors.dense(5.0, 6.0, 7.0)
  val list = new ListBuffer[Vector]()
  list += v1
  list += v2
 val listRdd =  sc.parallelize(list)
 listRdd.saveAsObjectFile("localFile")
 // read it back to an RDD as vector in another application
val fileRdd = sc.objectFile[Vector]("localFile")
 

These methods are available in JavaSparkContext and JavaRDD