Created on 06-28-2016 08:02 PM - edited 08-17-2019 11:52 AM
This tutorial will demonstrate using Spark for data processing operations on a large set of data consisting of pipe delimited text files. Delimited text files are a common format seen in Data Warehousing:
3 Different techniques will be used to solve the above 2 problems and then compare how they perform against each other:
The purpose of this tutorial is to provide you with code snippets for the above 3 techniques and to demonstrate how RDD’s outperform DataFrames and SparkSQL for certain types of data processing. See below at the end of this article for all code.
Also, these tests are demonstrating the native functionality within Spark for RDDs, DataFrames, and SparkSQL without calling additional modules/readers for file format conversions or other optimizations.
For joining datasets, DataFrames and SparkSQL are much more intuitive to use, especially SparkSQL, and may perhaps yield better performance results than RDDs.
At its core, Spark operates on the concept of Resilient Distributed Datasets, or RDD’s:
DataFrames API is a data abstraction framework that organizes your data into named columns:
SparkSQL is a Spark module for structured data processing. You can interact with SparkSQL through:
RDD Random Lookup
#!/usr/bin/env python from time import time from pyspark import SparkConf, SparkContext conf = (SparkConf() .setAppName("rdd_random_lookup") .set("spark.executor.instances", "10") .set("spark.executor.cores", 2) .set("spark.dynamicAllocation.enabled", "false") .set("spark.shuffle.service.enabled", "false") .set("spark.executor.memory", "500MB")) sc = SparkContext(conf = conf) t0 = time() path = "/data/customer_orders*" lines = sc.textFile(path) ## filter where the order_id, the second field, is equal to 96922894 print lines.map(lambda line: line.split('|')).filter(lambda line: int(line[1]) == 96922894).collect() tt = str(time() - t0) print "RDD lookup performed in " + tt + " seconds"
DataFrame Random Lookup
#!/usr/bin/env python from time import time from pyspark.sql import * from pyspark import SparkConf, SparkContext conf = (SparkConf() .setAppName("data_frame_random_lookup") .set("spark.executor.instances", "10") .set("spark.executor.cores", 2) .set("spark.dynamicAllocation.enabled", "false") .set("spark.shuffle.service.enabled", "false") .set("spark.executor.memory", "500MB")) sc = SparkContext(conf = conf) sqlContext = SQLContext(sc) t0 = time() path = "/data/customer_orders*" lines = sc.textFile(path) ## create data frame orders_df = sqlContext.createDataFrame( lines.map(lambda l: l.split("|")) .map(lambda p: Row(cust_id=int(p[0]), order_id=int(p[1]), email_hash=p[2], ssn_hash=p[3], product_id=int(p[4]), product_desc=p[5], country=p[6], state=p[7], shipping_carrier=p[8], shipping_type=p[9], shipping_class=p[10] ) ) ) ## filter where the order_id, the second field, is equal to 96922894 orders_df.where(orders_df['order_id'] == 96922894).show() tt = str(time() - t0) print "DataFrame performed in " + tt + " seconds"
SparkSQL Random Lookup
#!/usr/bin/env python from time import time from pyspark.sql import * from pyspark import SparkConf, SparkContext conf = (SparkConf() .setAppName("spark_sql_random_lookup") .set("spark.executor.instances", "10") .set("spark.executor.cores", 2) .set("spark.dynamicAllocation.enabled", "false") .set("spark.shuffle.service.enabled", "false") .set("spark.executor.memory", "500MB")) sc = SparkContext(conf = conf) sqlContext = SQLContext(sc) t0 = time() path = "/data/customer_orders*" lines = sc.textFile(path) ## create data frame orders_df = sqlContext.createDataFrame( lines.map(lambda l: l.split("|")) .map(lambda p: Row(cust_id=int(p[0]), order_id=int(p[1]), email_hash=p[2], ssn_hash=p[3], product_id=int(p[4]), product_desc=p[5], country=p[6], state=p[7], shipping_carrier=p[8], shipping_type=p[9], shipping_class=p[10] ) ) ) ## register data frame as a temporary table orders_df.registerTempTable("orders") ## filter where the customer_id, the first field, is equal to 96922894 print sqlContext.sql("SELECT * FROM orders where order_id = 96922894").collect() tt = str(time() - t0) print "SparkSQL performed in " + tt + " seconds"
RDD with GroupBy, Count, and Sort Descending
#!/usr/bin/env python from time import time from pyspark import SparkConf, SparkContext conf = (SparkConf() .setAppName("rdd_aggregation_and_sort") .set("spark.executor.instances", "10") .set("spark.executor.cores", 2) .set("spark.dynamicAllocation.enabled", "false") .set("spark.shuffle.service.enabled", "false") .set("spark.executor.memory", "500MB")) sc = SparkContext(conf = conf) t0 = time() path = "/data/customer_orders*" lines = sc.textFile(path) counts = lines.map(lambda line: line.split('|')) .map(lambda x: (x[5], 1)) .reduceByKey(lambda a, b: a + b) .map(lambda x:(x[1],x[0])) .sortByKey(ascending=False) for x in counts.collect(): print x[1] + '\t' + str(x[0]) tt = str(time() - t0) print "RDD GroupBy performed in " + tt + " seconds"
DataFrame with GroupBy, Count, and Sort Descending
#!/usr/bin/env python from time import time from pyspark.sql import * from pyspark import SparkConf, SparkContext conf = (SparkConf() .setAppName("data_frame_aggregation_and_sort") .set("spark.executor.instances", "10") .set("spark.executor.cores", 2) .set("spark.dynamicAllocation.enabled", "false") .set("spark.shuffle.service.enabled", "false") .set("spark.executor.memory", "500MB")) sc = SparkContext(conf = conf) sqlContext = SQLContext(sc) t0 = time() path = "/data/customer_orders*" lines = sc.textFile(path) ## create data frame orders_df = sqlContext.createDataFrame( lines.map(lambda l: l.split("|")) .map(lambda p: Row(cust_id=int(p[0]), order_id=int(p[1]), email_hash=p[2], ssn_hash=p[3], product_id=int(p[4]), product_desc=p[5], country=p[6], state=p[7], shipping_carrier=p[8], shipping_type=p[9], shipping_class=p[10] ) ) ) results = orders_df.groupBy(orders_df['product_desc']).count().sort("count",ascending=False) for x in results.collect(): print x tt = str(time() - t0) print "DataFrame performed in " + tt + " seconds"
SparkSQL with GroupBy, Count, and Sort Descending
#!/usr/bin/env python from time import time from pyspark.sql import * from pyspark import SparkConf, SparkContext conf = (SparkConf() .setAppName("spark_sql_aggregation_and_sort") .set("spark.executor.instances", "10") .set("spark.executor.cores", 2) .set("spark.dynamicAllocation.enabled", "false") .set("spark.shuffle.service.enabled", "false") .set("spark.executor.memory", "500MB")) sc = SparkContext(conf = conf) sqlContext = SQLContext(sc) t0 = time() path = "/data/customer_orders*" lines = sc.textFile(path) ## create data frame orders_df = sqlContext.createDataFrame(lines.map(lambda l: l.split("|")) .map(lambda r: Row(product=r[5]))) ## register data frame as a temporary table orders_df.registerTempTable("orders") results = sqlContext.sql("SELECT product, count(*) AS total_count FROM orders GROUP BY product ORDER BY total_count DESC") for x in results.collect(): print x tt = str(time() - t0) print "SparkSQL performed in " + tt + " seconds"
Source data files in HDFS:
Total size of source data files in HDFS:
Order ID is second field in pipe delimited file. There are 9 Million unique order ID records:
Output produced by GroupBy, Count, and Sort Descending (format will not be same for all, however, numbers will be same):
Created on 06-30-2016 07:08 AM
Very interesting, but I think it'd have been as more even comparision if you'd have used SparkSQL csv reader from databricks to read the file for DataFrame and SparkSQL tests, otherwise there is the overhead of converting the RDD to a DataFrame...
Created on 10-13-2016 06:34 PM
can we say this difference is only due to the conversion from RDD to dataframe ?
because as per apache documentation, dataframe has memory and query optimizer which should outstand RDD
I believe if the source is json file, we can directly read into dataframe and it would definitely have good performance compared to RDD
and why Sparksql has good performance compared to dataframe for grouping test ? dataframe and sparkSQL should be converted to similare RDD code and has same optimizers
Created on 05-04-2018 07:53 PM
Is this still valid? I mean there are many improvements on spark-sql & catalyst engine since spark 1.6.
Can you please rerun your tests?
Created on 02-21-2020 10:03 AM
Is the input dataset available somewhere?