The modern Data Warehouse contains a heterogenous mix of data: delimited text files, data in Hadoop (HDFS/Hive), relational databases, NoSQL databases, Parquet, Avro, JSON, Geospatial data, and more. ETL (Extract-Transform-Load) is a process used to integrate these disparate data types and create a unified view of the data. Apache Spark is a fast general purpose distributed computation engine for fault-tolerant parallel data processing. Spark is an excellent choice for ETL:
This article will demonstrate how easy it is to use Spark with the Python API (PySpark) for ETL processing to join data from text files with data in a MySQL table.
Pipe (|) delimited text files stored in HDFS containing information about customer orders:
Customer table in MySQL containing information about customers:
We need to join the data from the orders files in HDFS with the customer data in MySQL so that we produce a new file in HDFS displaying customers names to orders. The new file will be stored as an ORC formatted file. ORC is a columnar file format that provides high data compression and fast access for data analysis. Conceptually, the output will look like the screenshot below, but this is not how the data is stored. I'm only showing this to visualize the joined data set. ORC is columnar storage and not row storage.
The code below was tested using Spark 2.1. Beginning in Spark 2.0 the SQLContext and HiveContext have been replaced with SparkSession; however, both SQLContext and HiveContext are preserved for backwards compatibility. Notice in my code below that I'm using SparkSession.
To run the code below you will need the CSV parsing library from DataBricks: https://github.com/databricks/spark-csv
If your working with large data volumes, then I recommend that you use the CSV parsing library from DataBricks as it will be much faster than using your typical lambda function to parse out fields from a text file. For a performance evaluation, please refer to: https://community.hortonworks.com/content/kbentry/52866/hive-on-tez-vs-pyspark-for-weblogs-parsing.h...
Run the code using:
spark-submit --master yarn --deploy-mode cluster --packages com.databricks:spark-csv_2.11:1.4.0,mysql:mysql-connector-java:5.1.38 my_script.py
from pyspark.sql import SparkSession from pyspark.sql.types import * ## create a SparkSession spark = SparkSession .builder .appName("join_file_with_mysql") .config("spark.shuffle.service.enabled","true") .config("spark.dynamicAllocation.enabled","true") .config("spark.executor.cores","5") .getOrCreate() ## create a schema for the orders file customSchema = StructType([ StructField("order_number", StringType(), True), StructField("customer_number", StringType(), True), StructField("order_quantity", StringType(), True), StructField("unit_price", StringType(), True), StructField("total", StringType(), True)]) ## create a DataFrame for the orders files stored in HDFS df_orders = spark.read .format('com.databricks.spark.csv') .options(header='true', delimiter='|') .load('hdfs:///orders_file/', schema = customSchema) ## register the orders data as a temporary table df_orders.registerTempTable("orders") ## create a DataFrame for the MySQL customer table df_customers = spark.read.format("jdbc").options( url ="jdbc:mysql://bmathew-test.czrx1al336uo.ap-northeast-1.rds.amazonaws.com:3306/test", driver="com.mysql.jdbc.Driver", dbtable="customer", user="root", password="password" ).load() ## register the customers data as a temporary table df_customers.registerTempTable("customers") ## join the DataSets sql=''' SELECT a.first_name, a.last_name, b.order_number, b.total FROM customers a, orders b WHERE a.customer_number = b.customer_number ''' output = spark.sql(sql) ## save the data into an ORC file output.write.format("orc").save("/tmp/customer_orders")