Support Questions

Find answers, ask questions, and share your expertise

How does a foreachPartition for a JavaPairRDD works in sparks?

avatar
New Contributor

I am new to apache spark and am trying to run a custom nearest neighbor algorithm on an RDD that has been partitioned into 2 parts using a custom partitioner. The JavaPairRDD contains the graph details and the random object created on the graph.

Tinygraph and RoadObjects.jpgTinygraph Partition.jpg

According to my logic, I am building subgraphs for each partition, and I am running a custom algorithm on each subgraph. It seems to be working "although not properly". I am not sure if this is the correct way to apply action in each partition. I am adding my code. Comments and suggestions are highly appreciated.

 

 

 

// <Partition_Index_Key, Map<Source_vertex, Map<Destination Vertex, Tuple2<Edge_Length, ArrayList of Random Objects>>
			JavaPairRDD<Object, Map<Object, Map<Object, Tuple2<Double, ArrayList<RoadObject>>>>> adjVertForSubgraphsRDD = jscontext
					.parallelizePairs(adjacentVerticesForSubgraphs)
					.partitionBy(new CustomPartitioner(CustomPartitionSize));

			//applying foreachPartition action on JavaPairRDD
			adjVertForSubgraphsRDD.foreachPartition(
					new VoidFunction<Iterator<Tuple2<Object, Map<Object, Map<Object, Tuple2<Double, ArrayList<RoadObject>>>>>>>() {

						/**
						 * 
						 */
						private static final long serialVersionUID = 1L;

						@Override
						public void call(
								Iterator<Tuple2<Object, Map<Object, Map<Object, Tuple2<Double, ArrayList<RoadObject>>>>>> tupleRow)
								throws Exception {
							int sourceVertex;
							int destVertex;
							double edgeLength;

							int roadObjectId;
							boolean roadObjectType;
							double distanceFromStart;

							CoreGraph subgraph0 = new CoreGraph();
							CoreGraph subgraph1 = new CoreGraph();

							while (tupleRow.hasNext()) {
								

								Map<Object, Map<Object, Tuple2<Double, ArrayList<RoadObject>>>> newMap = tupleRow.next()
										._2();

								if ((Integer.parseInt(String.valueOf(tupleRow.next()._1())) == 0)) {

									for (Object srcVertex : newMap.keySet()) {

										for (Object dstVertex : newMap.get(srcVertex).keySet()) {
											if (newMap.get(srcVertex).get(dstVertex)._2() != null) {
												sourceVertex = Integer.parseInt(String.valueOf(srcVertex));
												destVertex = Integer.parseInt(String.valueOf(dstVertex));
												edgeLength = newMap.get(srcVertex).get(dstVertex)._1();

												subgraph0.addEdge(sourceVertex, destVertex, edgeLength);

												for (int i = 0; i < newMap.get(srcVertex).get(dstVertex)._2()
														.size(); i++) {
													int currentEdgeId = subgraph0.getEdgeId(sourceVertex, destVertex);

													roadObjectId = newMap.get(srcVertex).get(dstVertex)._2().get(i)
															.getObjectId();
													roadObjectType = newMap.get(srcVertex).get(dstVertex)._2().get(i)
															.getType();
													distanceFromStart = newMap.get(srcVertex).get(dstVertex)._2().get(i)
															.getDistanceFromStartNode();
													RoadObject rn0 = new RoadObject();
													rn0.setObjId(roadObjectId);
													rn0.setType(roadObjectType);
													rn0.setDistanceFromStartNode(distanceFromStart);

													subgraph0.addObjectOnEdge(currentEdgeId, rn0);
												}
											} else {
												sourceVertex = Integer.parseInt(String.valueOf(srcVertex));
												destVertex = Integer.parseInt(String.valueOf(dstVertex));
												edgeLength = newMap.get(srcVertex).get(dstVertex)._1();

												subgraph0.addEdge(sourceVertex, destVertex, edgeLength);
											}

										}
									}

								} else if ((Integer.parseInt(String.valueOf(tupleRow.next()._1())) == 1)) {

									for (Object srcVertex : newMap.keySet()) {
										for (Object dstVertex : newMap.get(srcVertex).keySet()) {
											if (newMap.get(srcVertex).get(dstVertex)._2() != null) {
												sourceVertex = Integer.parseInt(String.valueOf(srcVertex));
												destVertex = Integer.parseInt(String.valueOf(dstVertex));
												edgeLength = newMap.get(srcVertex).get(dstVertex)._1();

												subgraph1.addEdge(sourceVertex, destVertex, edgeLength);

												for (int i = 0; i < newMap.get(srcVertex).get(dstVertex)._2()
														.size(); i++) {
													int currentEdgeId = subgraph1.getEdgeId(sourceVertex, destVertex);

													roadObjectId = newMap.get(srcVertex).get(dstVertex)._2().get(i)
															.getObjectId();
													roadObjectType = newMap.get(srcVertex).get(dstVertex)._2().get(i)
															.getType();
													distanceFromStart = newMap.get(srcVertex).get(dstVertex)._2().get(i)
															.getDistanceFromStartNode();
													RoadObject rn1 = new RoadObject();
													rn1.setObjId(roadObjectId);
													rn1.setType(roadObjectType);
													rn1.setDistanceFromStartNode(distanceFromStart);

													subgraph1.addObjectOnEdge(currentEdgeId, rn1);
												}
											} else {
												sourceVertex = Integer.parseInt(String.valueOf(srcVertex));
												destVertex = Integer.parseInt(String.valueOf(dstVertex));
												edgeLength = newMap.get(srcVertex).get(dstVertex)._1();

												subgraph1.addEdge(sourceVertex, destVertex, edgeLength);
											}

										}
									}
								}

							}
							// Straight forward nearest neighbor algorithm from each true to false.
							ANNNaive ann = new ANNNaive();
							System.err.println("-------------------------------");
							Map<Integer, Integer> nearestNeighorPairsSubg0 = ann.compute(subgraph0, true);
							System.out.println("for subgraph0");
							System.out.println(nearestNeighorPairsSubg0);
							System.err.println("-------------------------------");

							System.err.println("-------------------------------");
							Map<Integer, Integer> nearestNeighorPairsSubg1 = ann.compute(subgraph1, true);
							System.out.println("for subgraph1");
							System.out.println(nearestNeighorPairsSubg1);
							System.err.println("-------------------------------");

						}
					});

 

 

 

 

 

0 REPLIES 0