Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Mapreduce java program to search QuadTree index and also run GeometryEngine.contains to confirm point in polygon using wkt file

Highlighted

Mapreduce java program to search QuadTree index and also run GeometryEngine.contains to confirm point in polygon using wkt file

New Contributor

This post is a map reduce implementation suggested for my previous question: "How to optimize scan of 1 huge file / table in Hive to confirm/check if lat long point is contained ..."

I am not well-versed in writing java programs for map-reduce and I mainly use Hive or Pig or spark to develop in Hadoop eco-system. To give a background of task at hand: I am trying to associate every latitude/longitude ping to corresponding ZIP postal code. I have a WKT multi-polygon shape file (500 MB) with all the zip information. I have loaded this in Hive and can do a join using ST_Contains(polygon, point). However, it takes very long to complete. To over come this bottle neck I am trying to leverage the example in ESRI ("https://github.com/Esri/gis-tools-for-hadoop/tree/master/samples/point-in-polygon-aggregation-mr") by building a quad tree index for searching a point derived from lat-long in polygon.

I have managed to write the code and it clogs up the Java heap memory of the cluster. Any suggestions on improving the code or looking at a different approach will be greatly appreciated:

Error message: Error: Java heap space Container killed by the ApplicationMaster.

Container killed on request.

Exit code is 143 Container exited with a non-zero exit code 143

public class MapperClass extends Mapper<LongWritable, Text, Text, IntWritable> {

    // column indices for values in the text file
    int longitudeIndex;
    int latitudeIndex;
    int wktZip; 
    int wktGeom;
    int wktLineCount;
    int wktStateID;

    // in boundaries.wkt, the label for the polygon is "wkt"
    //creating ArrayList to hold details of the file
    ArrayList<ZipPolyClass> nodes = new ArrayList<ZipPolyClass>();

    String labelAttribute;
    EsriFeatureClass featureClass;
    SpatialReference spatialReference;
    QuadTree quadTree;
    QuadTreeIterator quadTreeIter;
    BufferedReader csvWkt;

    // class to store all the values from wkt file and calculate geometryFromWKT 
    public class ZipPolyClass {

        public String zipCode;
        public String wktPoly;
        public String stateID;
        public int indexJkey;
        public Geometry wktGeomObj; 

        public ZipPolyClass(int ijk, String z, String w, String s ){
            zipCode = z;
            wktPoly = w;
            stateID = s;
            indexJkey = ijk;
            wktGeomObj = GeometryEngine.geometryFromWkt(wktPoly, 0, Geometry.Type.Unknown);
        }

    }


    //building quadTree Index from WKT multiPolygon and creating an iterator
    private void buildQuadTree(){
        quadTree = new QuadTree(new Envelope2D(-180, -90, 180, 90), 8);

        Envelope envelope = new Envelope();

        int j=0;

        while(j<nodes.size()){
            nodes.get(j).wktGeomObj.queryEnvelope(envelope);
            quadTree.insert(j, new Envelope2D(envelope.getXMin(), envelope.getYMin(), envelope.getXMax(), envelope.getYMax()));
        }

        quadTreeIter = quadTree.getIterator();
    }


    /**
     * Query the quadtree for the feature containing the given point
     * 
     * @param pt point as longitude, latitude
     * @return index to feature in featureClass or -1 if not found
     */
    private int queryQuadTree(Point pt)
    {
        // reset iterator to the quadrant envelope that contains the point passed
        quadTreeIter.resetIterator(pt, 0);

        int elmHandle = quadTreeIter.next();

        while (elmHandle >= 0){
            int featureIndex = quadTree.getElement(elmHandle);

            // we know the point and this feature are in the same quadrant, but we need to make sure the feature
            // actually contains the point
            if (GeometryEngine.contains(nodes.get(featureIndex).wktGeomObj, pt, spatialReference)){
                return featureIndex;
            }

            elmHandle = quadTreeIter.next();
        }

        // feature not found
        return -1;
    }


    /**
     * Sets up mapper with filter geometry provided as argument[0] to the jar
     */
    @Override
    public void setup(Context context)
    {
        Configuration config = context.getConfiguration();

        spatialReference = SpatialReference.create(4326);

        // first pull values from the configuration     
        String featuresPath = config.get("sample.features.input");
        //get column reference from driver class 
        wktZip = config.getInt("sample.features.col.zip", 0);
        wktGeom = config.getInt("sample.features.col.geometry", 18);
        wktStateID = config.getInt("sample.features.col.stateID", 3);
        latitudeIndex = config.getInt("samples.csvdata.columns.lat", 5);
        longitudeIndex = config.getInt("samples.csvdata.columns.long", 6);

        FSDataInputStream iStream = null;

        try {
            // load the text WKT file provided as argument 0
            FileSystem hdfs = FileSystem.get(config);
            iStream = hdfs.open(new Path(featuresPath));
            BufferedReader br = new BufferedReader(new InputStreamReader(iStream));
            String wktLine ;
            int i=0;

            while((wktLine = br.readLine()) != null){
                String [] val = wktLine.split("\\|");
                String qtZip = val[wktZip];
                String poly = val[wktGeom];
                String stID = val[wktStateID];
                ZipPolyClass zpc = new ZipPolyClass(i, qtZip, poly, stID);
                nodes.add(i,zpc);
                i++; // increment in the loop before end
                }

        } 
        catch (Exception e)
        {
            e.printStackTrace();
        } 
        finally
        {
            if (iStream != null)
            {
                try {
                    iStream.close();
                } catch (IOException e) { }
            }
        }

        // build a quadtree of our features for fast queries
        if (!nodes.isEmpty()) {
            buildQuadTree();
        }
    }

    @Override
    public void map(LongWritable key, Text val, Context context)
            throws IOException, InterruptedException {

        /* 
         * The TextInputFormat we set in the configuration, by default, splits a text file line by line.
         * The key is the byte offset to the first character in the line.  The value is the text of the line.
         */

        String line = val.toString();
        String [] values = line.split(",");

        // get lat long from file and convert to float
        float latitude = Float.parseFloat(values[latitudeIndex]);
        float longitude = Float.parseFloat(values[longitudeIndex]);

        // Create our Point directly from longitude and latitude
        Point point = new Point(longitude, latitude);


        int featureIndex = queryQuadTree(point);

        // Each map only processes one record at a time, so we start out with our count 
                // as 1. Since we have a distinct record file we will not run reducer
                IntWritable one = new IntWritable(1);

        if (featureIndex >= 0){

            String zipTxt =nodes.get(featureIndex).zipCode;
            String stateIDTxt = nodes.get(featureIndex).stateID;
            String latTxt = values[latitudeIndex];
            String longTxt = values[longitudeIndex];
            String pointTxt = point.toString();
            String name;
            name = zipTxt+"\t"+stateIDTxt+"\t"+latTxt+"\t"+longTxt+ "\t" +pointTxt;

            context.write(new Text(name), one);
        } else {
            context.write(new Text("*Outside Feature Set"), one);
        }
    }
}
Don't have an account?
Coming from Hortonworks? Activate your account here