Community Articles

Find and share helpful community-sourced technical articles.
Labels (1)
avatar

BACKGROUND

The modern Data Warehouse contains a heterogenous mix of data: delimited text files, data in Hadoop (HDFS/Hive), relational databases, NoSQL databases, Parquet, Avro, JSON, Geospatial data, and more. ETL (Extract-Transform-Load) is a process used to integrate these disparate data types and create a unified view of the data. Apache Spark is a fast general purpose distributed computation engine for fault-tolerant parallel data processing. Spark is an excellent choice for ETL:

  • Works with a myriad of data sources: files, RDBMS's, NoSQL, Parquet, Avro, JSON, XML, and many more.
  • Write your ETL code using Java, Scala, or Python.
  • In-memory computing for fast data processing.
  • API's to easily create schemas for your data and perform SQL computations.
  • Distributed computing and fault-tolerance is built into the framework and abstracted from the end-user.

This article will demonstrate how easy it is to use Spark with the Python API (PySpark) for ETL processing to join data from text files with data in a MySQL table.

SOURCE DATA

Pipe (|) delimited text files stored in HDFS containing information about customer orders:

12254-screen-shot-2017-02-08-at-35721-pm.png

Customer table in MySQL containing information about customers:

249410_1_new.png

We need to join the data from the orders files in HDFS with the customer data in MySQL so that we produce a new file in HDFS displaying customers names to orders. The new file will be stored as an ORC formatted file. ORC is a columnar file format that provides high data compression and fast access for data analysis. Conceptually, the output will look like the screenshot below, but this is not how the data is stored. I'm only showing this to visualize the joined data set. ORC is columnar storage and not row storage.

249410_2_new.png

CODE

The code below was tested using Spark 2.1. Beginning in Spark 2.0 the SQLContext and HiveContext have been replaced with SparkSession; however, both SQLContext and HiveContext are preserved for backwards compatibility. Notice in my code below that I'm using SparkSession.

To run the code below you will need the CSV parsing library from DataBricks: https://github.com/databricks/spark-csv

If your working with large data volumes, then I recommend that you use the CSV parsing library from DataBricks as it will be much faster than using your typical lambda function to parse out fields from a text file. For a performance evaluation, please refer to: https://community.hortonworks.com/content/kbentry/52866/hive-on-tez-vs-pyspark-for-weblogs-parsing.h...

Run the code using:

spark-submit --master yarn --deploy-mode cluster --packages com.databricks:spark-csv_2.11:1.4.0,mysql:mysql-connector-java:5.1.38 my_script.py

from pyspark.sql import SparkSession
from pyspark.sql.types import *

## create a SparkSession
spark = SparkSession     .builder     .appName("join_file_with_mysql")     .config("spark.shuffle.service.enabled","true")     .config("spark.dynamicAllocation.enabled","true")     .config("spark.executor.cores","5")     .getOrCreate()

## create a schema for the orders file
customSchema = StructType([     StructField("order_number", StringType(), True),     StructField("customer_number", StringType(), True),     StructField("order_quantity", StringType(), True),     StructField("unit_price", StringType(), True),     StructField("total", StringType(), True)])

## create a DataFrame for the orders files stored in HDFS
df_orders = spark.read     .format('com.databricks.spark.csv')     .options(header='true', delimiter='|')     .load('hdfs:///orders_file/', schema = customSchema)

## register the orders data as a temporary table
df_orders.registerTempTable("orders")

## create a DataFrame for the MySQL customer table
df_customers = spark.read.format("jdbc").options(
    url ="jdbc:mysql://bmathew-test.czrx1al336uo.ap-northeast-1.rds.amazonaws.com:3306/test",
    driver="com.mysql.jdbc.Driver",
    dbtable="customer",
    user="root",
    password="password"
).load()

## register the customers data as a temporary table
df_customers.registerTempTable("customers")

## join the DataSets
sql='''
SELECT a.first_name, a.last_name, b.order_number, b.total
FROM customers a, orders b
WHERE a.customer_number = b.customer_number
'''

output = spark.sql(sql)

## save the data into an ORC file
output.write.format("orc").save("/tmp/customer_orders")

 

16,910 Views