Member since
02-26-2016
100
Posts
111
Kudos Received
12
Solutions
03-21-2017
06:47 PM
I tested the above code with Spark 1.6.2. I have not tested with Spark 2.0. I see your using Spark 2.0. Spark 2.0 uses the SparkSession which replaces both HiveContext and SQLContext. You will have to use SparkSession. See below. After creating the SparkSession, like how I do below, I don't know off hand if the following will work: 'spark.load(
source="jdbc",
url="jdbc:sqlserver://ec2-54-244-44-6.us-west-2.compute.amazonaws.com:1433;database=sales;user=my_username;password=my_password",
dbtable="orders")' from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("my_application_name") \ .config("spark.shuffle.service.enabled","true") \ .config("spark.dynamicAllocation.enabled","true") \ .getOrCreate()
... View more
03-17-2017
03:44 PM
3 Kudos
This tutorial will demonstrate how to execute PySpark jobs on an HDP cluster and pass in parameter values using the Livy REST interface. I will also demonstrate how to interact with Livy via Apache Zeppelin and use forms in Zeppelin to pass in parameter values. Livy is an open source REST interface for interacting with Spark.
Livy requires at least Spark 1.6.
By default, Livy is built against Apache Spark 1.6.2 with HDP 2.5.3
If on HDP 2.4.2, then Livy 0.2.0 might be part of HDP 2.4.2 repos. If not, then it can be cloned from: https://github.com/cloudera/livy
Livy is built against Apache Spark 1.6.2 with HDP 2.5.3.
The following was tested with HDP 2.5.3 and Spark 1.6.2.
Business Use Case:
A PySpark script that resides on the Livy server aggregates order statistics from a Hive table for a date range passed in as parameters to the Python script.
The Start and End dates are passed in as parameter values to the Python script.
The job runs on the cluster via YARN and produces the following output:
Start Date
End Date
City
Min Order Amt
Max Order Amt
Avg Order Amt
2017-02-06
2017-02-15
Chicago
54.23
788.23
354.23
2017-02-06
2017-02-15
San Francisco
98.23
1032.12
634.12
Users can execute the PySpark script via Livy using curl from a terminal window, Apache Zeppelin, or other applications.
Execute PySpark script on HDP cluster via Livy using curl
from pyspark import SparkContext, HiveContext
import sys
sc = SparkContext('yarn-client')
spark = HiveContext(sc)
start_date=sys.argv[1]
end_date=sys.argv[2]
sql = """
select '%s' start_date, '%s' as end_date, city, min(order_amt) as min_order_amount, max(order_amt) as max_order_amount, avg(order_amt) as average_order_amount from orders where
to_date(order_date) between '%s' and '%s' group by city order by city
""" %(start_date,end_date,start_date,end_date)
spark.sql(sql).show()
How to call PySpark Script:
The PySpark script is a regular file on the Livy server. Notice I’m setting the location of the file with ‘file:’
Livy allows for impersonation to create a shell running as a different user. This is the setting ‘proxyUser’. Notice that I’m running the script as ‘test_livy_impersonation’
All the properties supported by Spark such as the number of executors, number of cores, executor memory, etc., can be changed at session creation. Their format is the same as when executing via spark-submit. Notice that I’m setting the number of executor cores to 4.
I’m passing parameters to the script by setting values for ‘args’
Command to run script:
curl -X POST -H "Content-Type: application/json"ec2-xxxxxx.compute.amazonaws.com:8998/batches --data '{"file": "file:/tmp/test.py", "name": "Livy PySpark Example", "executorCores":4, "proxyUser": "test_livy_impersonation","args":["2017-01-15","2017-01-19"]}' | python -m json.tool
*Note: The default Livy port will be 8998
After executing the above command, you will get the following output which gives you the session ID number:
Get output:
Depending on the complexity of processing and availability of resources on the cluster, the job completion could take some time.
Take the session ID from the output above (33) and run the following command to get the results from the log file:
curl ec2-xxxxxx.us-east-2.compute.amazonaws.com:8998/batches/35/log | python -m json.tool
*Note: Replace 35 above with your session ID
Results from the log file:
Zeppelin with Livy and pass in parameter values via Zeppelin forms
At a minimum, configure Zeppelin’s Livy server interpreter and set ‘zeppelin.livy.url’ to your Livy server:
Also, set livy.spark.master to YARN if you want jobs to run on YARN:
If your cluster is Kerberized, then you will need to set values for ‘zeppelin.livy.keytab’ and ‘zeppelin.livy.principal’:
Sample output from my Hive table using Spark Sql interpreter (%sql):
You can access the data in this Hive table using PySpark via the Livy interpreter (%livy.pyspark) with input forms that pass parameter values to your PySpark script:
In your PySpark code enclose parameters with ‘${parameter_name}’. In the example below I am passing in start date and end date as {$start_date} and ${end_date}.
Input boxes will appear to enter values
This also works with the Spark SQL (%sql) interpreter:
... View more
Labels:
02-09-2017
03:30 AM
2 Kudos
BACKGROUND
The modern Data Warehouse contains a heterogenous mix of data: delimited text files, data in Hadoop (HDFS/Hive), relational databases, NoSQL databases, Parquet, Avro, JSON, Geospatial data, and more. ETL (Extract-Transform-Load) is a process used to integrate these disparate data types and create a unified view of the data. Apache Spark is a fast general purpose distributed computation engine for fault-tolerant parallel data processing. Spark is an excellent choice for ETL:
Works with a myriad of data sources: files, RDBMS's, NoSQL, Parquet, Avro, JSON, XML, and many more.
Write your ETL code using Java, Scala, or Python.
In-memory computing for fast data processing.
API's to easily create schemas for your data and perform SQL computations.
Distributed computing and fault-tolerance is built into the framework and abstracted from the end-user.
This article will demonstrate how easy it is to use Spark with the Python API (PySpark) for ETL processing to join data from text files with data in a MySQL table.
SOURCE DATA
Pipe (|) delimited text files stored in HDFS containing information about customer orders:
Customer table in MySQL containing information about customers:
We need to join the data from the orders files in HDFS with the customer data in MySQL so that we produce a new file in HDFS displaying customers names to orders. The new file will be stored as an ORC formatted file. ORC is a columnar file format that provides high data compression and fast access for data analysis. Conceptually, the output will look like the screenshot below, but this is not how the data is stored. I'm only showing this to visualize the joined data set. ORC is columnar storage and not row storage.
CODE
The code below was tested using Spark 2.1. Beginning in Spark 2.0 the SQLContext and HiveContext have been replaced with SparkSession; however, both SQLContext and HiveContext are preserved for backwards compatibility. Notice in my code below that I'm using SparkSession.
To run the code below you will need the CSV parsing library from DataBricks: https://github.com/databricks/spark-csv
If your working with large data volumes, then I recommend that you use the CSV parsing library from DataBricks as it will be much faster than using your typical lambda function to parse out fields from a text file. For a performance evaluation, please refer to: https://community.hortonworks.com/content/kbentry/52866/hive-on-tez-vs-pyspark-for-weblogs-parsing.html
Run the code using:
spark-submit --master yarn --deploy-mode cluster --packages com.databricks:spark-csv_2.11:1.4.0,mysql:mysql-connector-java:5.1.38 my_script.py
from pyspark.sql import SparkSession
from pyspark.sql.types import *
## create a SparkSession
spark = SparkSession .builder .appName("join_file_with_mysql") .config("spark.shuffle.service.enabled","true") .config("spark.dynamicAllocation.enabled","true") .config("spark.executor.cores","5") .getOrCreate()
## create a schema for the orders file
customSchema = StructType([ StructField("order_number", StringType(), True), StructField("customer_number", StringType(), True), StructField("order_quantity", StringType(), True), StructField("unit_price", StringType(), True), StructField("total", StringType(), True)])
## create a DataFrame for the orders files stored in HDFS
df_orders = spark.read .format('com.databricks.spark.csv') .options(header='true', delimiter='|') .load('hdfs:///orders_file/', schema = customSchema)
## register the orders data as a temporary table
df_orders.registerTempTable("orders")
## create a DataFrame for the MySQL customer table
df_customers = spark.read.format("jdbc").options(
url ="jdbc:mysql://bmathew-test.czrx1al336uo.ap-northeast-1.rds.amazonaws.com:3306/test",
driver="com.mysql.jdbc.Driver",
dbtable="customer",
user="root",
password="password"
).load()
## register the customers data as a temporary table
df_customers.registerTempTable("customers")
## join the DataSets
sql='''
SELECT a.first_name, a.last_name, b.order_number, b.total
FROM customers a, orders b
WHERE a.customer_number = b.customer_number
'''
output = spark.sql(sql)
## save the data into an ORC file
output.write.format("orc").save("/tmp/customer_orders")
... View more
Labels:
12-18-2016
01:57 PM
4 Kudos
A colleague recently asked me how to create a custom function for Hive using Python. You can pretty much create a function in any language and plug it into your Hive query using the Hive TRANSFORM clause. TRANSFORM lets you add your own mappers and/or reducers to process the data. The example in this article is working code that I wrote a few years ago using an early version of Hive to demonstrate how to add a custom function. In earlier versions of Hive we had to implement our own functions to hash sensitive data for PII compliance. Beginning with Hive 1.3 the SHA2 UDF was added to calculate a hash using SHA-224, SHA-256, SHA-384, or SHA-512. In my example below I create a custom UDF using Python to calculate the SHA-256 hash for social security number. Keep in mind that when I did this there were no out of the box Hive UDF’s available. This example is to only demonstrate how to write your own custom functions for Hive using Python. First, we need to write some python code that will read each record passed in from Hive and process the data. Save this to a file: #!/usr/local/bin/python
import hashlib
import sys
## we are receiving each record passed in from Hive via standard input
## By default, columns will be transformed to STRING and delimited by TAB
## Also, by default, NULL values will be converted to literal string \N to differentiate from empty strings
for line in sys.stdin:
line = line.strip()
(customer_no,ssn,plan,join_date,status,balance,region) = line.split('\t')
## hash social security number and emit all the fields to standard out
x = hashlib.sha256(str(ssn))
ssn = x.hexdigest()
print '\t'.join([str(customer_no),str(ssn),plan,str(join_date),status,str(balance),region])
Now you can call the above python code from your HiveQL: ADD FILE /path-to-my-script/my_python_code.py;
CREATE VIEW customer_data_mart_view.v_customer_balance
SELECT
TRANSFORM (
customer_no
,ssn
,plan
,join_date
,status
,balance
,region)
USING '/path-to-my-script/my_python_code.py'
AS customer_no
,ssn
,plan
,join_date
,status
,balance
,region
FROM customer_data_mart.customer_details;
... View more
Labels:
12-07-2016
09:19 AM
4 Kudos
I recently worked with a customer to demo HDF/NiFi DataFlows for their uses cases. One use case involved converting delimited row-formatted text files into Parquet and ORC columnar formats. This can quickly be done with HDF/NiFi.
Here is an easy to follow DataFlow that will convert row-formatted text files to Parquet and ORC. It may look straightforward; however, it requires some basic knowledge of Avro file formats and use of the Kite API. This article will explain the DataFlow in detail.
STEP 1: Create an Avro schema for the source data
My source data is tab delimited and consists of 8 fields.
Notice in the DataFlow that before converting the data to Parquet and ORC the data is first converted to Avro. This is done so we have schema information for the data. Prior to converting the data to Avro we need to create an Avro schema definition file for the source data. Here is what my schema definition file looks like for the 8 fields. I stored this in a file named sample.avsc.
More information on Avro schemas can be found here: https://avro.apache.org/docs/1.7.7/spec.html
STEP 2: Use Kite API to create a Parquet dataset
The DataFlow uses the ‘StoreInKiteDataset’ processor. Before we can use this processor to convert the Avro data to Parquet we need to have a directory already created in HDFS to store the data as Parquet. This is done by calling the Kite API.
Kite is an API for Hadoop that lets you easily define how your data is stored:
Works with file formats including CSV, JSON, Avro, and Parquet
Hive
HDFS
Local File System
HBase
Amazon S3
Compress data: Snappy (default), Deflate, Bzip2, and Lzo
Kite will handle how the data is stored. For example, if I wanted to store incoming CSV data into a Parquet formatted Hive table, I could use the Kite API to create a schema for my CSV data and then call the Kite API to create the Hive table for me. Kite also works with partitioned data and will automatically partition records when writing.
In this example I am writing the data into HDFS.
Call Kite API to create a directory in HDFS to store the Parquet data. The file sample.avsc contains my schema definition:
./kite-dataset create dataset:hdfs://ip-172-31-2-101.xxxx.compute.internal:8020/tmp/sample_data/parquet --schema sample.avsc --format parquet
If you want to load directly into a Hive table, then you would call the Kite API using the following command:
./kite-dataset create sample_hive_table --schema sample.avsc --format parquet
To learn more about the Kite API and to download, follow this link: http://kitesdk.org/docs/current/
STEP 3: Get source data
The source files could exist in one more places including remote server, local file system, or in HDFS. This example is using files stored in HDFS.
STEP 4: Convert data to Avro format
Configure the 'ConvertCSVToAvro' processor to specify the location for the schema definition file and specify properties of the source delimited file so NiFi knows how to read the source data.
STEP 5: Convert data to columnar Parquet format
Configure the 'StoreInKiteDataset' processor to set the URI for your Kite dataset.
My target dataset URI is: dataset:hdfs://ip-172-31-2-101.xxxx.compute.internal:8020/tmp/sample_data/parquet
This is the directory I created in STEP 2. I'm writing to an HDFS directory. I could also write directly to a Hive table.
STEP 6: Convert data to columnar ORC format and store in HDFS
These 2 are straightforward. For the 'ConvertAvroToORC' processor you can specify the ORC stripe size as well as optionally compress the data.
'ConvertAvroToORC' processor settings:
Data before and after conversion
The text data is considerably larger than the Parquet and ORC files.
... View more
Labels:
12-01-2016
03:21 AM
4 Kudos
Row and Columnar Storage For Hive
Customers often ask about columnar storage formats for Hive tables and
when to use them. It depends on your uses cases. If your data access
patterns mostly involve selecting a few columns to perform aggregations,
then using columnar storage will save disk space, reduce I/O when
fetching data, and improve query execution time. In this article I
tested the columnar ORC file format for Hive to quantify how it
outperforms row based Text files when used for queries that aggregate and group on a few columns from a table with many columns.
ORC is a columnar storage format used in Hadoop for Hive tables. It is an efficient file format for storing data in which records contain many columns. An example is Clickstream (web) data to analyze website activity and performance. Queries typically retrieve a subset of columns for each row. SUMMARY OF TEST RESULTS:
Comparing ORC vs Text files for storing 22 million clickstream page view records in Hive tables
Each row as 40 columns
Data files are uncompressed (i.e. no CODEC such as Snappy, ZLIB, LZO, etc…)
52% Reduction in disk space when using ORC. High columnar storage compression:
97% Reduction in disk I/O when using ORC:
21% Improvement in HiveQL query execution time when using ORC:
HiveQL is counting the number of page views by Operating System
and then ranking each Operating System by order of most page views in
descending order. Sample output showing a few rows:
Using ORC file format may not always equate to significantly less Memory
and CPU usage for analytical queries than if using row based Text files
for high volumes of data. In fact, memory usage could be greater when
using ORC format. You can optimize Memory usage by optionally
compressing the data using a CODEC such as ZLIB or Snappy; however, CPU
time will then increase because of compression and decompression.
... View more
Labels:
11-28-2016
06:15 AM
11 Kudos
SYNOPSIS
The Optimized Row Columnar (ORC) file is a columnar storage format for Hive. Specific Hive configuration settings for ORC formatted tables can improve query performance resulting in faster execution and reduced usage of computing resources. Some of these settings may already be turned
on by default, whereas others require some educated guesswork.
The table below compares Tez job statistics for the same Hive query that was submitted without and with certain configuration settings. Notice the performance gains with optimization. This article will explain how the performance improvements were achieved.
QUERY EXECUTION
Source Data:
102,602,110 Clickstream page view records across 5 days of data for multiple countries
Table is partitioned by date in the format YYYY-MM-DD.
There are no indexes and table is not bucketed.
The HiveQL is ranking each page per user by how many times the user viewed that page for a specific date and within the United States. Breakdown of the query:
Scan all the page views for each user.
Filter for page views on 1 date partition and only include traffic in the United States.
For each user, rank each page in terms of how many times it was viewed by that user.
For example, I view Page A 3 times and Page B once. Page A would rank 1 and Page B would rank 2.
Without optimization
With optimization
Notice the change in reducers
The final output size of all the reducers is 920 MB.
For the first run, 73 reducers completed resulting in 73 output files. This is excessive. 920 MB into 73 reducers is around 12.5 MB per reducer output. This is unnecessary overhead resulting in too many small files. More parallelism does not always equate to better performance.
The second run launched 10 reducers resulting in 10 reduce files. 920 MB into 10 reducers is about 92 MB per reducer output. Much less overhead and we don’t run into the small files problem. The maximum number of files in HDFS depends on the amount of memory available in the NameNode. Each block, file, and directory in HDFS is represented as an object in the NameNode’s memory each of which occupies about 150 Bytes.
OPTIMIZATION
Always collect statistics on those tables for which data changes frequently. Schedule an automated ETL job to run at certain times:
ANALYZE TABLE page_views_orc COMPUTE STATISTICS FOR COLUMNS;
Run the Hive query with the following settings:
SET hive.optimize.ppd=true;
SET hive.optimize.ppd.storage=true;
SET hive.vectorized.execution.enabled=true;
SET hive.vectorized.execution.reduce.enabled = true;
SET hive.cbo.enable=true;
SET hive.compute.query.using.stats=true;
SET hive.stats.fetch.column.stats=true;
SET hive.stats.fetch.partition.stats=true;
SET hive.tez.auto.reducer.parallelism=true;
SET hive.tez.max.partition.factor=20;
SET hive.exec.reducers.bytes.per.reducer=128000000;
Partition your tables by date if you are storing a high volume of data per day. Table management becomes easier. You can easily drop partitions that are no longer needed or for which data has to be reprocessed.
SUMMARY
Let’s look at each of the Hive settings.
Enable predicate pushdown (PPD) to filter at the storage layer:
SET hive.optimize.ppd=true;
SET hive.optimize.ppd.storage=true
Vectorized query execution processes data in batches of 1024 rows instead of one by one:
SET hive.vectorized.execution.enabled=true;
SET hive.vectorized.execution.reduce.enabled=true;
Enable the Cost Based Optimizer (COB) for efficient query execution based on cost and fetch table statistics:
SET hive.cbo.enable=true;
SET hive.compute.query.using.stats=true;
SET hive.stats.fetch.column.stats=true;
SET hive.stats.fetch.partition.stats=true;
Partition and column statistics from fetched from the metastsore. Use this with caution. If you have too many partitions and/or columns, this could degrade performance.
Control reducer output:
SET hive.tez.auto.reducer.parallelism=true;
SET hive.tez.max.partition.factor=20;
SET hive.exec.reducers.bytes.per.reducer=128000000;
This last set is important. The first run produced 73 output files with each file being around 12.5 MB in size. This is inefficient as I explained earlier. With the above settings, we are basically telling Hive an approximate maximum number of reducers to run with the caveat that the size for each reduce output should be restricted to 128 MB. Let's examine this:
The parameter hive.tez.max.partition.factor is telling Hive to launch up to 20 reducers. This is just a guess on my part and Hive will not necessarily enforce this. My job completed with only 10 reducers - 10 output files.
Since I set a value of 128 MB for hive.exec.reducers.bytes.per.reducer, Hive will try to fit the reducer output into files that are come close to 128 MB each and not just run 20 reducers.
If I did not set hive.exec.reducers.bytes.per.reducer, then Hive would have launched 20 reducers, because my query output would have allowed for this. I tested this and 20 reducers ran.
128 MB is an approximation for each reducer output when setting hive.exec.reducers.bytes.per.reducer. In this example the total size of the output files is 920 MB. Hive launched 10 reducers which is about 92 MB per reducer file. When I set this to 64 MB, then Hive launched the 20 reducers with each file being around 46 MB.
If hive.exec.reducers.bytes.per.reducer is set to a very high value then you will have fewer reducers than if set to a lower value. Higher values result in fewer reducers being launched which can also degrade performance. You need just the right level of parallelism.
... View more
Labels:
11-18-2016
09:57 AM
11 Kudos
Synopsis
You are a marketing analyst tasked with developing a BI dashboard for management that reports on website activity in realtime throughout the day for Black Friday, the day after Thanksgiving.
Unfortunately, all of the data engineers are busy with other projects and cannot immediately help you.
Not to worry. With Hortonworks DataFlow, powered by Apache NiFi, you can easily create a data flow in minutes to process the data needed for the dashboard.
Source Data:
Fictitious online retailer Initech Corporation
JSON Formatted Clickstream data
Your company stores the raw Clickstream log files in HDFS. The JSON formatted log files are written to HDFS directories partitioned by day and hour. For example:
/data/clickstream/2016-11-20/hour=00
/data/clickstream/2016-11-20/hour=01
/data/clickstream/2016-11-20/hour=...
/data/clickstream/2016-11-20/hour=23
There could also be subdirectories under the hour
This directory structure or a similar variant is commonly found in organizations when storing Clickstream logs
The JSON data contains nested hierarchies. Sample JSON record:
Solution considerations:
New files are written throughout the day for each hour and late arriving data is added to existing hours.
Since files are constantly arriving, you need the data flow to always be on and running
HBase is a fast and scalable option for the backend datastore:
Data is semi-structured JSON
HBase does not require a Schema
HBase was designed for high throughput writes, whereas Hive is better suited for batch loads
Use Apache Phoenix to create a view atop the HBase table:
Phoenix creates the schema for reporting
Phoenix lets you write ANSI SQL queries against the HBase data
Phoenix JDBC driver allows any BI tool to connect
HDF DataFlow Overview:
* You may need to increase the heap size used by NiFi. You will know if/when you start getting the error: java.lang.OutOfMemoryError
Throughput:
HDF loaded 215,699 records into HBase in under 33 seconds
I'm running HDF on a single node with 64 GB Ram and 16 cores
There are other Hadoop and Hadoop related services running on this node including Node Manager, DataNode, Livy Server, RegionServer, Phoenix Query Server, NFSGateway, Metrics Monitor
Step 1:
Create the HBase table. You can easily do this via Apache Zeppelin.
Create 1 column family 'pageview'
Multiple versions are not needed as there are no updates to this data. Each website hit is unique.
Step 2:
Create the Phoenix view. You can easily do this via Apache Zeppelin.
Step 3:
Configure the ListHDFS processor
The ListHDFS processor will only list new files as they are written to directories
The DataFlow will be constantly running so we do not want to process the same data twice. Thus, the ListHDFS processor will prevent us from processing the same data twice
Our top-level directory is the Clickstream data for 2016-11-20
ListHDFS will list all new files beneath the subdirectories under the top-level directory of 2016-11-20
Notice that the value for 'Directory' is the top-level directory '/data/clickstream/2016-11-20'
The value for 'Recursive Subdirectories' is set to 'true' so that all new files will be picked up as they are added
The Hadoop Configuration Resources are the full path locations to core-site.xml and hdfs-site.xml
Step 4:
Configure the FetchHDFS processor
The ListHDFS processor will list files
FetchHDFS will retrieve the contents from the files listed
Step 5:
Use the SplitText processor to split each JSON record from the HDFS files
I used the processor as is without any changes, other than renaming it to 'Split_Lines_In_File'
Step 6:
Configure the JoltTransformJSON processor to extract only those fields needed for the dashboard.
We only need the following fields from the JSON data:
event ID - unique identifier for each website hit
event time - time when the website hit occurred
url - url of the page viewed
country - country that the page was served
session id - session id for the user
user cookie
os - the users device os
browser - browser that the user used
Some of the above fields are child elements for a top-level parent. The property 'Jolt Specification' needs to be set so that we properly extract those fields:
Step 7:
Configure the PutHBaseJSON processor to store the record.
Apache Zeppelin Notebook:
... View more
09-30-2016
01:56 PM
3 Kudos
Use Apache Spark to connect to SQL Server, extract a table from SQL Server, and load the extracted rows into a Hive table:
You will need to download JDBC driver for SQL Server Download the driver from: https://www.microsoft.com/en-us/download/details.aspx?id=11774 Run code: /usr/bin/spark-submit --driver-class-path /home/centos/sqljdbc_4.0/enu/sqljdbc4.jar --master yarn-client my_script.py
import os
from pyspark import SparkConf,SparkContext
from pyspark.sql import HiveContext
conf = (SparkConf()
.setAppName("data_import")
.set("spark.dynamicAllocation.enabled","true")
.set("spark.shuffle.service.enabled","true"))
sc = SparkContext(conf = conf)
sqlctx = HiveContext(sc)
df = sqlctx.load(
source="jdbc",
url="jdbc:sqlserver://ec2-54-244-44-6.us-west-2.compute.amazonaws.com:1433;database=sales;user=my_username;password=my_password",
dbtable="orders")
## this is how to write to an ORC file
df.write.format("orc").save("/tmp/orc_query_output")
## this is how to write to a hive table
df.write.mode('overwrite').format('orc').saveAsTable("test_table")
... View more
Labels:
08-25-2016
02:08 PM
7 Kudos
SYNOPSIS
Which is faster when analyzing data using Spark 1.6.1: HDP with HDFS for storage, or EMR?
Testing shows that HDP using HDFS has performance gains over using EMR. HDP/HDFS outperforms EMR by 46% when tested against 1 full day of 37 GB Clickstream (Web) data. HDP/HDFS EMR Time Elapsed 3 mins, 29 sec 5 mins, 5 sec
* See below at end of article for validation and screen prints showing Resource Manager logs
HDP
Hortonworks Data Platform (HDP) is the industry's only true secure, enterprise-ready open source Apache Hadoop distribution. The Hadoop Distributed File System (HDFS) is a Java-based distributed block storage file system that is used to store all data in HDP.
EMR
Amazon Elastic MapReduce (Amazon EMR) is a managed Hadoop framework to distribute and process vast amounts of data across dynamically scalable Amazon EC2 instances.
S3 is an inexpensive object store that can theoretically scale out infinitely without the limitations inherent to a hierarchical block storage file system.
Objects are not stored in file systems; instead, users create objects and associate keys with them.
Object storage also has the option of tagging metadata with your data.
TEST
Spark (PySpark) using DataFrames to get a Count of Page Views by Operating System (Desktop and Mobile OS types) against a full day of Clickstream data (24 hours) and listing the top 20 most used operating systems. Ran the same Spark code against an AWS HDP cluster on EC2 instances with data stored in HDFS and against an AWS EMR cluster.
Test Data
COMPANY X is a global online marketplace connecting consumers with merchants. Total data size is 37 GB. 1 Full day of page views data (24 hours of Clickstream logs). 22.3 Million page view records from 13 countries in North America, Latin America, and Asia. Data is in JSON format and uncompressed. 143 files totaling 37 GB. Each file averages 256 MB. All 143 source JSON files were placed into HDFS on HDP and into S3 on EMR.
Platform Versions
HDP 2.3.0 - Hadoop version 2.7.1 EMR 4.5.0 - Hadoop version 2.7.2
AWS HDP and EMR Clusters were sized/configured similarly
m4.2xlarge Instances 1 master and 4 worker nodes
TEST RESULTS
Spark 1.6.1 on HDP/HDFS outperformed Spark 1.6.1 on EMR 46% Total elapsed time for HDP/HDFS: 3 minutes 29 seconds Total elapsed time for EMR: 5 minutes 5 seconds
TESTING VALIDATION
Sample JSON record
Total disk usage in HDFS consumed by all files is 37 G
Source data consists of 143 JSON files. Each file averages 256 MB for a total data volume of 37 GB
Output produced. Operating system and total page view count:
HDP Resource Manager log
EMR Resource Manager log
... View more
Labels: