Created on 12-03-2015 06:13 PM
The objective of this article is to introduce the ESRI Spatial Framework for Hadoop and demonstrate how to use it with HDP 2.3.2. The following is a high-level view of how we will accomplish this:
Prior to continuing the tutorial, please ensure you have downloaded and started the HDP Sandbox.
The ESRI Spatial Framework for Hadoop is a collection of Hive UDFs that allow users to perform complex spatial analysis directly in Hive. The framework has built-in support for representing geometric shapes (point, polygon, etc.) as well as functions that operate on these shapes. For example, users can perform a binary test for overlap between a pair of polygons or compute the geometry of the intersection. This framework provides a powerful method for stitching together datasets with geospatial features that, otherwise, may not have been able to be correlated.
The framework itself is open source so we need to clone the repository and build its dependencies in order to use it. This can be done as follows:
git clone https://github.com/Esri/geometry-api-java.git cd geometry-api-java mvn clean install cd .. git clone https://github.com/Esri/geometry-api-java.git cd spatial-framework-for-hadoop mvn clean package
The relevant libraries that we will use later are geometry-api-java/target/esri-geometry-api-1.2.1.jar and spatial-framework-for-hadoop/hive/target/spatial-sdk-hive-1.1.1-SNAPSHOT.jar. Please note there locations for later use.
The data we will use is sourced from the Open Baltimore Data portal. I have linked (in the next section) data that I have already obtained and prepped but I include the steps in this section for completeness. There is no need to perform these steps if you use the attached files in which case you can proceed directly to the Importing Data to Hive section.
The data relating to the 2010 census can be exported as an ESRI Shapefile from here. This includes a breakdown of demographics and, more importantly, shape data for the polygon that represents each neighborhood. The export zip archive contains several files but the *.shp file is the one we are after.
The spatial framework supports a couple JSON formats so we need to convert the Shapefile before we can use it. On your sandbox, you can convert convert this Shapefile to GeoJson format with the following steps:
yum install gdal ogr2ogr -f "Geojson" -t_srs "WGS84" 2010_census.json 2010_Census_Profile_by_Neighborhood_Statistical_Areas.shp
Now, delete the first 3 lines and the last 2 lines of the file so it starts with: { "type": "Feature", "properties": ...
This is the file that is attached here as 2010_census.json. We will load this into Hive in the next step.
The crime data can be exported as a CSV file from here. There is no prep needed for this file. It is also attached as Crime_Data.csv
This section will walk through how to import the attached 2010_census.json and Crime_Data.csv files into HDFS and overlay a schema on them via Hive. The data will remain as-is on disk and the schema will be applied when the table is read.
Transfer the two files to the sandbox via SCP or your preferred method. Then put them into the HDFS tmp directory:
hdfs dfs -put 2010_census.json /tmp hdfs dfs -put Crime_Data.csv /tmp
Note that the 2.3.2 sandbox does not have a directory in HDFS for the root user and this can cause Hive queries to fail. To remedy this you need to issue the following commands as root:
su hdfs hdfs dfs -mkdir /user/root hdfs dfs -chown root:hdfs /user/root exit
You will need the most recent version of the Hive JSON serde as well. You can get it as follows:
wget https://github.com/sheetaldolas/Hive-JSON-Serde/a... unzip json-serde-1.1.9.8.zip
From Hive, you can now create the corresponding crimes and census tables and ingest them into a more optimized ORC format.
We can overlay a schema on the Census JSON data suing a Hive JSON Serde. The Serde is responsible for telling Hive how to read and write in JSON format. Notice that the schema maps directly to the JSON structure you see in 2010_census.json.
add jar Hive-JSON-Serde-json-serde-1.1.9.8/dist/json-serde-1.1.9.2-Hive13-jar-with-dependencies.jar; CREATE TABLE census_text ( type string, properties map<string,string>, geometry string ) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' STORED AS TEXTFILE; LOAD DATA INPATH '/tmp/2010_census.json' OVERWRITE INTO TABLE census_text;
Nest we will create an optimized ORC backed table and import the Census data. This process will leverage the JSON Serde to apply a schema when the JSON is read and then we will extract specific properties from the JSON and promote them to first-class fields in the ORC table. This will allow us to leverage predicate pushdown features of ORC and produce more efficient queries when these fields are used as conditions. Notice that the geometry is left as a plain string -- the spatial framework will read this at query time.
set hive.execution.engine=mr; -- A simple schema with several features promoted to fields in the table. CREATE TABLE census_orc ( name string, population double, male double, female double, age_0_4 double, age_5_11 double, age_12_14 double, age_15_17 double, age_18_24 double, age_25_34 double, age_35_44 double, age_45_64 double, age_65_ovr double, vacant double, occupied double, geometry string ) STORED AS ORC; -- We are casting many features from string to a more appropriate type of double as we -- ingest into the ORC table. INSERT INTO TABLE census_orc select properties['name'], cast(properties['population'] as double), cast(properties['Male'] as double), cast(properties['Female'] as double), cast(properties['AGE0_4'] as double), cast(properties['AGE5_11'] as double), cast(properties['AGE12_14'] as double), cast(properties['AGE15_17'] as double), cast(properties['AGE18_24'] as double), cast(properties['AGE25_34'] as double), cast(properties['AGE35_44'] as double), cast(properties['AGE45_65'] as double), cast(properties['AGE65ovr'] as double), cast(properties['Vacant'] as double), cast(properties['Occupied'] as double), geometry from census_text where geometry != 'NULL';
CREATE TABLE crimes_text(crime_date string, code string, location string, description string, weapon string, post string, district string, neighborhood string, coordinates string) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde' TBLPROPERTIES ("skip.header.line.count"="1"); LOAD DATA INPATH '/tmp/Crime_Data.csv' OVERWRITE INTO TABLE crimes_text; -- Basic schema for and ORC backed table for crimes CREATE TABLE crimes_orc(crime_date string, code string, location string, description string, weapon string, post string, district string, neighborhood string, latitude double, longitude double) STORED AS ORC; -- load the plain CSV data into an optimized ORC file. The coordinates field is split -- and the individual latitude and longitude is extracted into separate fields INSERT INTO TABLE crimes_orc SELECT crime_date, code, location, description, weapon, post, district, neighborhood, cast(substr(split(coordinates, ',')[0], 2) as double), cast(split(split(coordinates, ',')[1], '\\)')[0] as double) FROM crimes_text;
Now that we have done all of the work to set up the framework and get the data prepped and ready we can finally run a couple sample queries. We will add the ESRI jars and make some simple aliases to the spatial functions that we need as follows:
add jar geometry-api-java/target/esri-geometry-api-1.2.1.jar; add jar spatial-framework-for-hadoop/hive/target/spatial-sdk-hive-1.1.1-SNAPSHOT.jar; create temporary function ST_Point as 'com.esri.hadoop.hive.ST_Point'; create temporary function ST_Contains as 'com.esri.hadoop.hive.ST_Contains'; create temporary function ST_GeomFromGeoJson as 'com.esri.hadoop.hive.ST_GeomFromGeoJson';
One quick query we can run is to map crimes to a neighborhood via the ST_Contains relationship test and count the total per neighborhood.
set hive.vectorized.execution.enabled = false; set hive.execution.engine=tez; select census_orc.name, count(*) cnt FROM census_orc JOIN crimes_orc WHERE crimes_orc.description="HOMICIDE" and ST_Contains(ST_GeomFromGeoJSON(census_orc.geometry), ST_Point(crimes_orc.longitude, crimes_orc.latitude)) GROUP BY census_orc.name ORDER by cnt desc LIMIT 10;
Notice that this induces a full cross product to be computed between the two tables. This is a very compute and time-intensive operation (taking seconds on my Sandbox). It is more efficient to narrow down the data to only those specific elements in which we are interested. This is why we have narrowed down the search to the most severe crimes and where the description field is labeled 'HOMICIDE'.
This query runs in 266 seconds on my Sandbox. This is directly related to our efforts to trim the data with the available features to avoid unnecessary operations (especially expensive spatial operations). The output:
Coldstream Homestead Montebello 44 Belair-Edison 38 Sandtown-Winchester 37 Central Park Heights 33 Frankford 32 Oliver 30 East Baltimore Midway 27 Broadway East 26 Upton 24 Brooklyn 23 Time taken: 266.383 seconds, Fetched: 10 row(s)
At this point we have set up the ESRI Spatial Framework for Hadoop and executed some simple queries. I encourage you to explore the API a bit more and see if you can discover anything within the data. I also encourage you to explore the other data sets provided by the city of Baltimore. There are many more data sets available that can be correlated to to these and lead to interesting results (for example: CCTV locations).
Thanks to @David Kaiser for spending the time with me and providing several pointers and tips to get me up to speed on this space.
Created on 12-03-2015 08:54 PM
@Brandon Wilson Thank you for this!!
Created on 12-18-2015 07:21 AM
If you are exploring GeoSpatial processing with Spark you should check out Magellan.