Created 08-15-2022 10:14 PM
I am new to apache spark and am trying to run a custom nearest neighbor algorithm on an RDD that has been partitioned into 2 parts using a custom partitioner. The JavaPairRDD contains the graph details and the random object created on the graph.
According to my logic, I am building subgraphs for each partition, and I am running a custom algorithm on each subgraph. It seems to be working "although not properly". I am not sure if this is the correct way to apply action in each partition. I am adding my code. Comments and suggestions are highly appreciated.
// <Partition_Index_Key, Map<Source_vertex, Map<Destination Vertex, Tuple2<Edge_Length, ArrayList of Random Objects>>
JavaPairRDD<Object, Map<Object, Map<Object, Tuple2<Double, ArrayList<RoadObject>>>>> adjVertForSubgraphsRDD = jscontext
.partitionBy(new CustomPartitioner(CustomPartitionSize));
//applying foreachPartition action on JavaPairRDD
new VoidFunction<Iterator<Tuple2<Object, Map<Object, Map<Object, Tuple2<Double, ArrayList<RoadObject>>>>>>>() {
private static final long serialVersionUID = 1L;
public void call(
Iterator<Tuple2<Object, Map<Object, Map<Object, Tuple2<Double, ArrayList<RoadObject>>>>>> tupleRow)
throws Exception {
int sourceVertex;
int destVertex;
double edgeLength;
int roadObjectId;
boolean roadObjectType;
double distanceFromStart;
CoreGraph subgraph0 = new CoreGraph();
CoreGraph subgraph1 = new CoreGraph();
while (tupleRow.hasNext()) {
Map<Object, Map<Object, Tuple2<Double, ArrayList<RoadObject>>>> newMap =
if ((Integer.parseInt(String.valueOf( == 0)) {
for (Object srcVertex : newMap.keySet()) {
for (Object dstVertex : newMap.get(srcVertex).keySet()) {
if (newMap.get(srcVertex).get(dstVertex)._2() != null) {
sourceVertex = Integer.parseInt(String.valueOf(srcVertex));
destVertex = Integer.parseInt(String.valueOf(dstVertex));
edgeLength = newMap.get(srcVertex).get(dstVertex)._1();
subgraph0.addEdge(sourceVertex, destVertex, edgeLength);
for (int i = 0; i < newMap.get(srcVertex).get(dstVertex)._2()
.size(); i++) {
int currentEdgeId = subgraph0.getEdgeId(sourceVertex, destVertex);
roadObjectId = newMap.get(srcVertex).get(dstVertex)._2().get(i)
roadObjectType = newMap.get(srcVertex).get(dstVertex)._2().get(i)
distanceFromStart = newMap.get(srcVertex).get(dstVertex)._2().get(i)
RoadObject rn0 = new RoadObject();
subgraph0.addObjectOnEdge(currentEdgeId, rn0);
} else {
sourceVertex = Integer.parseInt(String.valueOf(srcVertex));
destVertex = Integer.parseInt(String.valueOf(dstVertex));
edgeLength = newMap.get(srcVertex).get(dstVertex)._1();
subgraph0.addEdge(sourceVertex, destVertex, edgeLength);
} else if ((Integer.parseInt(String.valueOf( == 1)) {
for (Object srcVertex : newMap.keySet()) {
for (Object dstVertex : newMap.get(srcVertex).keySet()) {
if (newMap.get(srcVertex).get(dstVertex)._2() != null) {
sourceVertex = Integer.parseInt(String.valueOf(srcVertex));
destVertex = Integer.parseInt(String.valueOf(dstVertex));
edgeLength = newMap.get(srcVertex).get(dstVertex)._1();
subgraph1.addEdge(sourceVertex, destVertex, edgeLength);
for (int i = 0; i < newMap.get(srcVertex).get(dstVertex)._2()
.size(); i++) {
int currentEdgeId = subgraph1.getEdgeId(sourceVertex, destVertex);
roadObjectId = newMap.get(srcVertex).get(dstVertex)._2().get(i)
roadObjectType = newMap.get(srcVertex).get(dstVertex)._2().get(i)
distanceFromStart = newMap.get(srcVertex).get(dstVertex)._2().get(i)
RoadObject rn1 = new RoadObject();
subgraph1.addObjectOnEdge(currentEdgeId, rn1);
} else {
sourceVertex = Integer.parseInt(String.valueOf(srcVertex));
destVertex = Integer.parseInt(String.valueOf(dstVertex));
edgeLength = newMap.get(srcVertex).get(dstVertex)._1();
subgraph1.addEdge(sourceVertex, destVertex, edgeLength);
// Straight forward nearest neighbor algorithm from each true to false.
ANNNaive ann = new ANNNaive();
Map<Integer, Integer> nearestNeighorPairsSubg0 = ann.compute(subgraph0, true);
System.out.println("for subgraph0");
Map<Integer, Integer> nearestNeighorPairsSubg1 = ann.compute(subgraph1, true);
System.out.println("for subgraph1");