Member since
02-26-2016
100
Posts
111
Kudos Received
12
Solutions
03-10-2022
11:22 PM
I have hive table which has 16 billion rows daily insert into hive table as parquet and snappy compression, can you help on optimizing the data ingestion part
... View more
03-17-2017
03:44 PM
3 Kudos
This tutorial will demonstrate how to execute PySpark jobs on an HDP cluster and pass in parameter values using the Livy REST interface. I will also demonstrate how to interact with Livy via Apache Zeppelin and use forms in Zeppelin to pass in parameter values. Livy is an open source REST interface for interacting with Spark.
Livy requires at least Spark 1.6.
By default, Livy is built against Apache Spark 1.6.2 with HDP 2.5.3
If on HDP 2.4.2, then Livy 0.2.0 might be part of HDP 2.4.2 repos. If not, then it can be cloned from: https://github.com/cloudera/livy
Livy is built against Apache Spark 1.6.2 with HDP 2.5.3.
The following was tested with HDP 2.5.3 and Spark 1.6.2.
Business Use Case:
A PySpark script that resides on the Livy server aggregates order statistics from a Hive table for a date range passed in as parameters to the Python script.
The Start and End dates are passed in as parameter values to the Python script.
The job runs on the cluster via YARN and produces the following output:
Start Date
End Date
City
Min Order Amt
Max Order Amt
Avg Order Amt
2017-02-06
2017-02-15
Chicago
54.23
788.23
354.23
2017-02-06
2017-02-15
San Francisco
98.23
1032.12
634.12
Users can execute the PySpark script via Livy using curl from a terminal window, Apache Zeppelin, or other applications.
Execute PySpark script on HDP cluster via Livy using curl
from pyspark import SparkContext, HiveContext
import sys
sc = SparkContext('yarn-client')
spark = HiveContext(sc)
start_date=sys.argv[1]
end_date=sys.argv[2]
sql = """
select '%s' start_date, '%s' as end_date, city, min(order_amt) as min_order_amount, max(order_amt) as max_order_amount, avg(order_amt) as average_order_amount from orders where
to_date(order_date) between '%s' and '%s' group by city order by city
""" %(start_date,end_date,start_date,end_date)
spark.sql(sql).show()
How to call PySpark Script:
The PySpark script is a regular file on the Livy server. Notice I’m setting the location of the file with ‘file:’
Livy allows for impersonation to create a shell running as a different user. This is the setting ‘proxyUser’. Notice that I’m running the script as ‘test_livy_impersonation’
All the properties supported by Spark such as the number of executors, number of cores, executor memory, etc., can be changed at session creation. Their format is the same as when executing via spark-submit. Notice that I’m setting the number of executor cores to 4.
I’m passing parameters to the script by setting values for ‘args’
Command to run script:
curl -X POST -H "Content-Type: application/json"ec2-xxxxxx.compute.amazonaws.com:8998/batches --data '{"file": "file:/tmp/test.py", "name": "Livy PySpark Example", "executorCores":4, "proxyUser": "test_livy_impersonation","args":["2017-01-15","2017-01-19"]}' | python -m json.tool
*Note: The default Livy port will be 8998
After executing the above command, you will get the following output which gives you the session ID number:
Get output:
Depending on the complexity of processing and availability of resources on the cluster, the job completion could take some time.
Take the session ID from the output above (33) and run the following command to get the results from the log file:
curl ec2-xxxxxx.us-east-2.compute.amazonaws.com:8998/batches/35/log | python -m json.tool
*Note: Replace 35 above with your session ID
Results from the log file:
Zeppelin with Livy and pass in parameter values via Zeppelin forms
At a minimum, configure Zeppelin’s Livy server interpreter and set ‘zeppelin.livy.url’ to your Livy server:
Also, set livy.spark.master to YARN if you want jobs to run on YARN:
If your cluster is Kerberized, then you will need to set values for ‘zeppelin.livy.keytab’ and ‘zeppelin.livy.principal’:
Sample output from my Hive table using Spark Sql interpreter (%sql):
You can access the data in this Hive table using PySpark via the Livy interpreter (%livy.pyspark) with input forms that pass parameter values to your PySpark script:
In your PySpark code enclose parameters with ‘${parameter_name}’. In the example below I am passing in start date and end date as {$start_date} and ${end_date}.
Input boxes will appear to enter values
This also works with the Spark SQL (%sql) interpreter:
... View more
Labels:
02-09-2017
03:30 AM
2 Kudos
BACKGROUND
The modern Data Warehouse contains a heterogenous mix of data: delimited text files, data in Hadoop (HDFS/Hive), relational databases, NoSQL databases, Parquet, Avro, JSON, Geospatial data, and more. ETL (Extract-Transform-Load) is a process used to integrate these disparate data types and create a unified view of the data. Apache Spark is a fast general purpose distributed computation engine for fault-tolerant parallel data processing. Spark is an excellent choice for ETL:
Works with a myriad of data sources: files, RDBMS's, NoSQL, Parquet, Avro, JSON, XML, and many more.
Write your ETL code using Java, Scala, or Python.
In-memory computing for fast data processing.
API's to easily create schemas for your data and perform SQL computations.
Distributed computing and fault-tolerance is built into the framework and abstracted from the end-user.
This article will demonstrate how easy it is to use Spark with the Python API (PySpark) for ETL processing to join data from text files with data in a MySQL table.
SOURCE DATA
Pipe (|) delimited text files stored in HDFS containing information about customer orders:
Customer table in MySQL containing information about customers:
We need to join the data from the orders files in HDFS with the customer data in MySQL so that we produce a new file in HDFS displaying customers names to orders. The new file will be stored as an ORC formatted file. ORC is a columnar file format that provides high data compression and fast access for data analysis. Conceptually, the output will look like the screenshot below, but this is not how the data is stored. I'm only showing this to visualize the joined data set. ORC is columnar storage and not row storage.
CODE
The code below was tested using Spark 2.1. Beginning in Spark 2.0 the SQLContext and HiveContext have been replaced with SparkSession; however, both SQLContext and HiveContext are preserved for backwards compatibility. Notice in my code below that I'm using SparkSession.
To run the code below you will need the CSV parsing library from DataBricks: https://github.com/databricks/spark-csv
If your working with large data volumes, then I recommend that you use the CSV parsing library from DataBricks as it will be much faster than using your typical lambda function to parse out fields from a text file. For a performance evaluation, please refer to: https://community.hortonworks.com/content/kbentry/52866/hive-on-tez-vs-pyspark-for-weblogs-parsing.html
Run the code using:
spark-submit --master yarn --deploy-mode cluster --packages com.databricks:spark-csv_2.11:1.4.0,mysql:mysql-connector-java:5.1.38 my_script.py
from pyspark.sql import SparkSession
from pyspark.sql.types import *
## create a SparkSession
spark = SparkSession .builder .appName("join_file_with_mysql") .config("spark.shuffle.service.enabled","true") .config("spark.dynamicAllocation.enabled","true") .config("spark.executor.cores","5") .getOrCreate()
## create a schema for the orders file
customSchema = StructType([ StructField("order_number", StringType(), True), StructField("customer_number", StringType(), True), StructField("order_quantity", StringType(), True), StructField("unit_price", StringType(), True), StructField("total", StringType(), True)])
## create a DataFrame for the orders files stored in HDFS
df_orders = spark.read .format('com.databricks.spark.csv') .options(header='true', delimiter='|') .load('hdfs:///orders_file/', schema = customSchema)
## register the orders data as a temporary table
df_orders.registerTempTable("orders")
## create a DataFrame for the MySQL customer table
df_customers = spark.read.format("jdbc").options(
url ="jdbc:mysql://bmathew-test.czrx1al336uo.ap-northeast-1.rds.amazonaws.com:3306/test",
driver="com.mysql.jdbc.Driver",
dbtable="customer",
user="root",
password="password"
).load()
## register the customers data as a temporary table
df_customers.registerTempTable("customers")
## join the DataSets
sql='''
SELECT a.first_name, a.last_name, b.order_number, b.total
FROM customers a, orders b
WHERE a.customer_number = b.customer_number
'''
output = spark.sql(sql)
## save the data into an ORC file
output.write.format("orc").save("/tmp/customer_orders")
... View more
Labels:
10-25-2017
05:03 PM
One thing I wish I had known when starting with python UDF's is that you can write to stderr to assist in debugging. Then look in the Yarn RM for the logs. import sys sys.stderr.write('>>>> Read a line \n' + line + '\n')
... View more
06-28-2017
09:19 AM
Hi thanks for tuto. Do you know if it is possible to use "StoreInKiteDataset" with kerberos to write to HDFS ?
... View more
08-12-2018
01:50 PM
Great post Binu! What storage format would you suggest if you plan on storing the hive table into a dataframe and running an iterative process (machine learning algorithm x) against the data? I’m hard pressed to find any kind of discussions on this concept.
... View more
12-31-2016
03:25 AM
@Binu Mathew : Thanks for sharing the awesome article. Do you mind to share the sample data?
... View more
07-10-2018
10:42 AM
Will it work for spark version 2.3.0 ? Could you please update it as per this version.
... View more
08-25-2016
02:08 PM
7 Kudos
SYNOPSIS
Which is faster when analyzing data using Spark 1.6.1: HDP with HDFS for storage, or EMR?
Testing shows that HDP using HDFS has performance gains over using EMR. HDP/HDFS outperforms EMR by 46% when tested against 1 full day of 37 GB Clickstream (Web) data. HDP/HDFS EMR Time Elapsed 3 mins, 29 sec 5 mins, 5 sec
* See below at end of article for validation and screen prints showing Resource Manager logs
HDP
Hortonworks Data Platform (HDP) is the industry's only true secure, enterprise-ready open source Apache Hadoop distribution. The Hadoop Distributed File System (HDFS) is a Java-based distributed block storage file system that is used to store all data in HDP.
EMR
Amazon Elastic MapReduce (Amazon EMR) is a managed Hadoop framework to distribute and process vast amounts of data across dynamically scalable Amazon EC2 instances.
S3 is an inexpensive object store that can theoretically scale out infinitely without the limitations inherent to a hierarchical block storage file system.
Objects are not stored in file systems; instead, users create objects and associate keys with them.
Object storage also has the option of tagging metadata with your data.
TEST
Spark (PySpark) using DataFrames to get a Count of Page Views by Operating System (Desktop and Mobile OS types) against a full day of Clickstream data (24 hours) and listing the top 20 most used operating systems. Ran the same Spark code against an AWS HDP cluster on EC2 instances with data stored in HDFS and against an AWS EMR cluster.
Test Data
COMPANY X is a global online marketplace connecting consumers with merchants. Total data size is 37 GB. 1 Full day of page views data (24 hours of Clickstream logs). 22.3 Million page view records from 13 countries in North America, Latin America, and Asia. Data is in JSON format and uncompressed. 143 files totaling 37 GB. Each file averages 256 MB. All 143 source JSON files were placed into HDFS on HDP and into S3 on EMR.
Platform Versions
HDP 2.3.0 - Hadoop version 2.7.1 EMR 4.5.0 - Hadoop version 2.7.2
AWS HDP and EMR Clusters were sized/configured similarly
m4.2xlarge Instances 1 master and 4 worker nodes
TEST RESULTS
Spark 1.6.1 on HDP/HDFS outperformed Spark 1.6.1 on EMR 46% Total elapsed time for HDP/HDFS: 3 minutes 29 seconds Total elapsed time for EMR: 5 minutes 5 seconds
TESTING VALIDATION
Sample JSON record
Total disk usage in HDFS consumed by all files is 37 G
Source data consists of 143 JSON files. Each file averages 256 MB for a total data volume of 37 GB
Output produced. Operating system and total page view count:
HDP Resource Manager log
EMR Resource Manager log
... View more
Labels: