Member since
09-14-2015
79
Posts
91
Kudos Received
22
Solutions
04-23-2016
04:58 AM
7 Kudos
After completing this tutorial you will understand how to:
leverage Spark to infer a schema on a CSV dataset and persist it to Hive without explicitly declaring the DDL
deploy the Spark Thrift Server on the
Hortonworks Sandbox
connect and ODBC tool (Tableau) to the Spark Thrift Server via
the Hive ODBC driver, leveraging caching for ad-hoc visualization
Assumption 1: It is assumed that you have downloaded and deployed the Hortonworks sandbox, installed the Hive ODBC driver on your host machine, and installed
Tableau (or your preferred ODBC-based reporting tool).
Assumption 2: Please ensure that your host machine's /etc/hosts file has the appropriate entry mapping sandbox.hortonworks.com to the IP of your sandbox (e.g., 172.16.35.171 sandbox.hortonworks.com sandbox). Deploying the Spark Thrift Server
Within Ambari, click on the Hosts tab and then
select the sandbox.hortonworks.com node from the list.
Now you can click “Add” and choose Spark Thrift
Server from the list to deploy a thrift server.
After installing, start the thrift server via
the service menu.
Loading
the Data
The code blocks below are each intended to be executed in their own Zeppelin notebook cells. Each cell begins with a '%' indicating the interpreter to be used.
Open Zeppelin and create a new notebook: http://sandbox.hortonworks.com:9995
Download and take a peek at the first few lines
of the data:
%sh
wget https://dl.dropboxusercontent.com/u/3136860/Crime_Data.csv
hdfs dfs -put Crime_Data.csv /tmp
head Crime_Data.csv
Load the CSV reader dependency:
%dep
z.load("com.databricks:spark-csv_2.10:1.4.0")
Read the CSV file and infer the schema:
%pyspark
sqlContext = HiveContext(sc)
data = sqlContext.read.load("/tmp/Crime_Data.csv", format="com.databricks.spark.csv", header="true", inferSchema="true")
data.printSchema()
Persist the data to Hive:
%pyspark
data.registerAsTable("staging")
sqlContext.sql("CREATE TABLE crimes STORED AS ORC AS SELECT * FROM staging")
Verify the data is present and able to be
queried:
%sql
select Description, count(*) cnt from crimes
group by Description order by cnt desc
Connecting
Tableau via ODBC
Connect using the Hortonworks Hadoop Hive
connector:
Run the “Initial SQL” to cache the crimes table:
Verify the table is cached in the Thrift Server
UI: http://sandbox.hortonworks.com:4040/storage/
Select the default schema and drag the crimes
table into the tables area
Go to the worksheet and start exploring the
data using the cached table!
... View more
Labels:
12-08-2015
07:06 PM
2 Kudos
Environment Best practices dictate that, where possible, a
Hadoop cluster should be maintained behind a firewall to minimize any
potential security vulnerabilities that may arise from exposed ports and
web interfaces. A common approach to enabling user access in this
situation is to open up SSH into a set of gateway/edge nodes. This
ensures that users must authenticate prior to accessing any pieces of the
Hadoop ecosystem and implicitly encrypts all data sent between the
client and the cluster. This is a common setup for vanilla cloud-based
installations.
The problem with this setup is that, by
default, all access is limited to the CLI on the gateway machines. Users
outside of the cluster firewall cannot access valuable features such as
web UIs, and JDBC/ODBC connections. There are a few options to securely
enable these capabilities: Enable Kerberos+SPNEGO and Knox. Then open up the appropriate ports in the firewall.
Implement firewall rules to expose specific ports and hosts to a subset of known client IPs. Leverage SSH tunneling to route traffic over an SSH connection and into the cluster. This
article focuses on #3. The best solution will vary on a case-by-case
basis but SSH tunneling is the simplest and requires no intervention by
OPs staff once SSH is enabled. Accessing Web UIs via a SOCKS Proxy You
can use SSH to open a local port that connects to a remote environment
and behaves like a SOCKS proxy. Once this tunnel is established, you can
configure your web browser to use the proxy and all web traffic will be
routed over the tunnel and into the cluster environment (behind the
firewall where the environment is open). The following command will open
a tunnel to the machine gateway.hdp.cluster which has SSH enabled: ssh -D 8080 -f -C -q -N username@gateway.hdp.cluster Parameters map to the following: -D the local port to listen on -f send this ssh operation into the background after password prompts
-C use compression -q quiet mode --> suppress warnings and diagnostic messages
-N do not execute remote command or wait for the user to provide any commands Once
the tunnel is established, you can open your web browser navigate to
the "Network Settings" tab. Under the proxy settings, enable the SOCKS
proxy and enter localhost and port 8080. Now all web traffic from your
browser will be routed over the tunnel and appear as if it is coming
from gateway.hdp.cluster. You should be able to load web UIs that are
behind the firewall such as Ambari or the Namenode UI. Establishing an ODBC/JDBC connection vi SSH Tunnel For
an ODBC/JDBC connection, the behavior we want is a bit different than
the previous sections. We want to map a local port to a port on a remote
machine within the firewall, specifically the HiveServer2 port. We can
do that as follows: ssh -L 10000:hiverserver2.hdp.cluster:10000 username@gateway.hdp.cluster
Now, an application on the client can connect to localhost on port 10000
and, to the application, it will appear as if it is connecting directly
to hiveserver2.hdp.local on port 10000. Under the covers data is
actually going over the SSH tunnel to gateway.hdp.cluster and then being
routed to port 10000 on the hiveserver2.hdp.cluster node. To
configure the ODBC/JDBC connection on the client simply use localhost
and port 10000 in place of the hiveserver2 host as part of the JDBC/ODBC
connection parameters.
... View more
Labels:
12-03-2015
06:13 PM
7 Kudos
Overview The objective of this article is to introduce the ESRI Spatial Framework for Hadoop and demonstrate how to use it with HDP 2.3.2. The following is a high-level view of how we will accomplish this:
set up the ESRI Spatial Framwork for Hadoop on the HDP 2.3.2 sandbox ETL data from the Open Baltimore Data Portal Execute simple geospatial queries to correlate crime data in Baltimore with spatial neighborhood data Prior to continuing the tutorial, please ensure you have downloaded and started the HDP Sandbox. ESRI Spatial Framework for Hadoop The ESRI Spatial Framework for Hadoop is a collection of Hive UDFs that allow users to perform complex spatial analysis directly in Hive. The framework has built-in support for representing geometric shapes (point, polygon, etc.) as well as functions that operate on these shapes. For example, users can perform a binary test for overlap between a pair of polygons or compute the geometry of the intersection. This framework provides a powerful method for stitching together datasets with geospatial features that, otherwise, may not have been able to be correlated. Setting up the framework on HDP The framework itself is open source so we need to clone the repository and build its dependencies in order to use it. This can be done as follows: git clone https://github.com/Esri/geometry-api-java.git
cd geometry-api-java
mvn clean install
cd ..
git clone https://github.com/Esri/geometry-api-java.git
cd spatial-framework-for-hadoop
mvn clean package The relevant libraries that we will use later are geometry-api-java/target/esri-geometry-api-1.2.1.jar and spatial-framework-for-hadoop/hive/target/spatial-sdk-hive-1.1.1-SNAPSHOT.jar. Please note there locations for later use. Data The data we will use is sourced from the Open Baltimore Data portal. I have linked (in the next section) data that I have already obtained and prepped but I include the steps in this section for completeness. There is no need to perform these steps if you use the attached files in which case you can proceed directly to the Importing Data to Hive section. Neighborhood Census Data The data relating to the 2010 census can be exported as an ESRI Shapefile from here. This includes a breakdown of demographics and, more importantly, shape data for the polygon that represents each neighborhood. The export zip archive contains several files but the *.shp file is the one we are after. The spatial framework supports a couple JSON formats so we need to convert the Shapefile before we can use it. On your sandbox, you can convert convert this Shapefile to GeoJson format with the following steps: yum install gdal
ogr2ogr -f "Geojson" -t_srs "WGS84" 2010_census.json 2010_Census_Profile_by_Neighborhood_Statistical_Areas.shp
Now, delete the first 3 lines and the last 2 lines of the file so it starts with: { "type": "Feature", "properties": ... This is the file that is attached here as 2010_census.json. We will load this into Hive in the next step. Crimes Data The crime data can be exported as a CSV file from here. There is no prep needed for this file. It is also attached as Crime_Data.csv Importing Data into Hive This section will walk through how to import the attached 2010_census.json and Crime_Data.csv files into HDFS and overlay a schema on them via Hive. The data will remain as-is on disk and the schema will be applied when the table is read. Transfer the two files to the sandbox via SCP or your preferred method. Then put them into the HDFS tmp directory: hdfs dfs -put 2010_census.json /tmp
hdfs dfs -put Crime_Data.csv /tmp Note that the 2.3.2 sandbox does not have a directory in HDFS for the root user and this can cause Hive queries to fail. To remedy this you need to issue the following commands as root: su hdfs
hdfs dfs -mkdir /user/root
hdfs dfs -chown root:hdfs /user/root
exit You will need the most recent version of the Hive JSON serde as well. You can get it as follows: wget https://github.com/sheetaldolas/Hive-JSON-Serde/a...
unzip json-serde-1.1.9.8.zip From Hive, you can now create the corresponding crimes and census tables and ingest them into a more optimized ORC format. Census Data We can overlay a schema on the Census JSON data suing a Hive JSON Serde. The Serde is responsible for telling Hive how to read and write in JSON format. Notice that the schema maps directly to the JSON structure you see in 2010_census.json. add jar Hive-JSON-Serde-json-serde-1.1.9.8/dist/json-serde-1.1.9.2-Hive13-jar-with-dependencies.jar;
CREATE TABLE census_text (
type string,
properties map<string,string>,
geometry string
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
STORED AS TEXTFILE;
LOAD DATA INPATH '/tmp/2010_census.json' OVERWRITE INTO TABLE census_text;
Nest we will create an optimized ORC backed table and import the Census data. This process will leverage the JSON Serde to apply a schema when the JSON is read and then we will extract specific properties from the JSON and promote them to first-class fields in the ORC table. This will allow us to leverage predicate pushdown features of ORC and produce more efficient queries when these fields are used as conditions. Notice that the geometry is left as a plain string -- the spatial framework will read this at query time. set hive.execution.engine=mr;
-- A simple schema with several features promoted to fields in the table.
CREATE TABLE census_orc (
name string,
population double,
male double,
female double,
age_0_4 double,
age_5_11 double,
age_12_14 double,
age_15_17 double,
age_18_24 double,
age_25_34 double,
age_35_44 double,
age_45_64 double,
age_65_ovr double,
vacant double,
occupied double,
geometry string
)
STORED AS ORC;
-- We are casting many features from string to a more appropriate type of double as we
-- ingest into the ORC table.
INSERT INTO TABLE census_orc select properties['name'],
cast(properties['population'] as double),
cast(properties['Male'] as double),
cast(properties['Female'] as double),
cast(properties['AGE0_4'] as double),
cast(properties['AGE5_11'] as double),
cast(properties['AGE12_14'] as double),
cast(properties['AGE15_17'] as double),
cast(properties['AGE18_24'] as double),
cast(properties['AGE25_34'] as double),
cast(properties['AGE35_44'] as double),
cast(properties['AGE45_65'] as double),
cast(properties['AGE65ovr'] as double),
cast(properties['Vacant'] as double),
cast(properties['Occupied'] as double),
geometry from census_text where geometry != 'NULL'; Crime Data CREATE TABLE crimes_text(crime_date string,
code string,
location string,
description string,
weapon string,
post string,
district string,
neighborhood string,
coordinates string)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
TBLPROPERTIES ("skip.header.line.count"="1");
LOAD DATA INPATH '/tmp/Crime_Data.csv' OVERWRITE INTO TABLE crimes_text;
-- Basic schema for and ORC backed table for crimes
CREATE TABLE crimes_orc(crime_date string,
code string, location string, description string, weapon string, post string, district string, neighborhood string, latitude double, longitude double)
STORED AS ORC;
-- load the plain CSV data into an optimized ORC file. The coordinates field is split
-- and the individual latitude and longitude is extracted into separate fields
INSERT INTO TABLE crimes_orc
SELECT crime_date, code, location, description, weapon, post, district, neighborhood, cast(substr(split(coordinates, ',')[0], 2) as double), cast(split(split(coordinates, ',')[1], '\\)')[0] as double)
FROM crimes_text; Example Queries Now that we have done all of the work to set up the framework and get the data prepped and ready we can finally run a couple sample queries. We will add the ESRI jars and make some simple aliases to the spatial functions that we need as follows: add jar geometry-api-java/target/esri-geometry-api-1.2.1.jar;
add jar spatial-framework-for-hadoop/hive/target/spatial-sdk-hive-1.1.1-SNAPSHOT.jar;
create temporary function ST_Point as 'com.esri.hadoop.hive.ST_Point';
create temporary function ST_Contains as 'com.esri.hadoop.hive.ST_Contains';
create temporary function ST_GeomFromGeoJson as 'com.esri.hadoop.hive.ST_GeomFromGeoJson';
One quick query we can run is to map crimes to a neighborhood via the ST_Contains relationship test and count the total per neighborhood. set hive.vectorized.execution.enabled = false;
set hive.execution.engine=tez;
select census_orc.name, count(*) cnt FROM
census_orc JOIN crimes_orc WHERE crimes_orc.description="HOMICIDE" and
ST_Contains(ST_GeomFromGeoJSON(census_orc.geometry), ST_Point(crimes_orc.longitude, crimes_orc.latitude))
GROUP BY census_orc.name ORDER by cnt desc LIMIT 10; Notice that this induces a full cross product to be computed between the
two tables. This is a very compute and time-intensive operation (taking
seconds on my Sandbox). It is more efficient to narrow down the data to
only those specific elements in which we are interested. This is why we have narrowed down the search to the most severe crimes and where the description field is labeled
'HOMICIDE'. This query runs in 266 seconds on my Sandbox. This is directly related to our efforts to trim the data with the available features to avoid unnecessary operations (especially expensive spatial operations). The output: Coldstream Homestead Montebello 44
Belair-Edison 38
Sandtown-Winchester 37
Central Park Heights 33
Frankford 32
Oliver 30
East Baltimore Midway 27
Broadway East 26
Upton 24
Brooklyn 23
Time taken: 266.383 seconds, Fetched: 10 row(s) Conclusion At this point we have set up the ESRI Spatial Framework for Hadoop and executed some simple queries. I encourage you to explore the API a bit more and see if you can discover anything within the data. I also encourage you to explore the other data sets provided by the city of Baltimore. There are many more data sets available that can be correlated to to these and lead to interesting results (for example: CCTV locations). Thanks to @David Kaiser for spending the time with me and providing several pointers and tips to get me up to speed on this space.
... View more
Labels: