- Subscribe to RSS Feed
- Mark as New
- Mark as Read
- Bookmark
- Subscribe
- Printer Friendly Page
- Report Inappropriate Content
Created on 08-22-2016 08:27 PM
Synopsis
Both Pig and Spark (PySpark) excel at iterative data processing against weblogs data in text delimited format.
Is one faster than the other?
- Pig on Tez shows to outperform PySpark
- PySpark using the Databricks CSV parser shows to outperform Pig on Tez
Pig
- Apache Pig allows users to MapReduce transformations using a simple scripting language called Pig Latin.
- Pig translates the Pig Latin script into MapReduce so that it can be executed within YARN against a dataset stored in the Hadoop Distributed File System (HDFS)
- Apache Tez is a distributed data processing framework for building batch and interactive applications.
- Tez improves the MapReduce paradigm by dramatically improving its speed, while maintaining MapReduce’s ability to scale to petabytes of data.
- Pig on Tez excels at iterative data processing. It is fast, easy, and the optimal solution to implement when it comes to parsing weblogs.
Spark
- General purpose in-memory distributed data processing engine
- API’s to write code in Java, Scala, or Python
- Libraries for SQL, Streaming, Machine Learning, and Graph Processing
- Excellent for iterative data processing and a good choice for parsing weblogs
Test
-
Working with a customer who is parsing tab delimited text weblogs data consisting of 11 fields. Sample record:
2015-02-12 15:00:26 198.102.62.250 GET /origin-services.failover.arcgisonline.com.akadns.net/ArcGIS/rest/services/World_Street_Map/MapServer/tile/6.386012949206202/50/25 200 2649 0 "http://www.DOMAIN.com" "Mozilla/5.0 (Linux; U; Android 4.1.2; es-us; GT-N8010 Build/JZO54K) AppleWebKit/534.30 (KHTML, like Gecko) Version/4.0 Safari/534.30" -
- Their pig script is using Regular Expressions to parse out the individual fields
- Wrote a PySpark script to parse out the weblogs data
- Wrote another PySpark script using the Databricks CSV parsing library to parse out he weblogs data
Test Results
Data Volume | Hive on Tez | PySpark | PySpark using Databricks CSV parsing library |
75 Million records across 15 files in HDFS totaling 21.5 GB in size | 1mins, 51 sec | 2mins, 13 sec | 1mins, 21 sec |
150 Million records across 30 files in HDFS totaling 43.0 G in size | 2mins, 53 sec | 3mins, 43 sec | 2mins, 2sec |
Conclusion
- Pig on Tez wins over PySpark. Native out of the box functionality.
- PySpark using the Databricks CSV parsing library beats Pig on Tez; however, this is not native functionality and requires the overhead of maintaining an external JAR file developed by a third party.
Code
Pig, PySpark, and PySpark using Databricks CSV parsing library are all below.
Pig
TEXT_LOGS = load '/data/weblogs/*.dat' using TextLoader AS (line:chararray); set job.name 'parse_weblogs_pig' VALID_LOGS = FILTER TEXT_LOGS by line matches '^\\s*\\w.*'; LOGS = FOREACH VALID_LOGS GENERATE FLATTEN ( REGEX_EXTRACT_ALL(line, '^(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+"([^"]*)"\\s+([-]|"[^"]*")\\s+"?([^"]*)"?') ) AS ( event_date: chararray, event_time: chararray, cs_ip: chararray, cs_method: chararray, cs_uri: chararray, sc_status: chararray, sc_bytes: chararray, time_taken: chararray, cs_referer: chararray, cs_useragent: chararray, cs_cookie: chararray ); TLOGS = FOREACH LOGS GENERATE event_date, event_time, cs_ip, cs_method, cs_uri, (int)sc_status, (int)sc_bytes, time_taken, cs_referer, cs_useragent, cs_cookie ; STORE TLOGS into 'weblogs_data_parsed_pig' USING org.apache.hive.hcatalog.pig.HCatStorer();
PySpark
#!/usr/bin/env python # -*- coding: utf-8 -*- import sys import os from pyspark.sql import * from pyspark import SparkConf, SparkContext, SQLContext from pyspark.sql.types import * ## write to snappy compressed output conf = (SparkConf() .setAppName("parse_weblogs_spark") .set("spark.dynamicAllocation.enabled", "false") .set("spark.sql.parquet.compression.codec", "uncompressed") .set("spark.shuffle.compress", "true") .set("spark.io.compression.codec", "snappy") .set("spark.executor.instances", "60") .set("spark.executor.cores", 10) .set("spark.executor.memory", "20g")) sc = SparkContext(conf = conf) sqlContext = HiveContext(sc) ## read text file ## and parse out fields needed ## file is tab delimited path = "/data/weblogs/*.dat" lines = sc.textFile(path) parts = lines.map(lambda l: l.split("\t")) weblog_hits = parts.map(lambda o: Row(event_date=o[0], event_time=o[1], cs_ip=o[2], cs_method=o[3], cs_uri=o[4],sc_status=o[5],sc_bytes=o[6],time_taken=o[7],cs_referer=o[8],cs_useragent=o[9],cs_cookie=o[10])) schemaHits = sqlContext.createDataFrame(weblog_hits) schemaHits.write.format("parquet").save("/tmp/parquet_query_output")
PySpark with Databricks CSV parsing library
You will need to get the JAR file containing the CSV parsing library: https://github.com/databricks/spark-csv
When you submit the spark job, pass in the package:
$SPARK_HOME/bin/spark-submit --master yarn-client --packages com.databricks:spark-csv_2.11:1.4.0 my_python_script.py
#!/usr/bin/env python # -*- coding: utf-8 -*- import sys import os from pyspark.sql import * from pyspark import SparkConf, SparkContext, SQLContext from pyspark.sql.types import * conf = (SparkConf() .setAppName("parse_weblogs_spark") .set("spark.dynamicAllocation.enabled", "false") .set("spark.sql.parquet.compression.codec", "uncompressed") .set("spark.shuffle.compress", "true") .set("spark.io.compression.codec", "snappy") .set("spark.executor.instances", "60") .set("spark.executor.cores", 10) .set("spark.executor.memory", "20g")) sc = SparkContext(conf = conf) sqlContext = HiveContext(sc) customSchema = StructType([ \ StructField("event_date", StringType(), True), \ StructField("event_time", StringType(), True), \ StructField("cs_ip", StringType(), True), \ StructField("cs_method", StringType(), True), \ StructField("cs_uri", StringType(), True), \ StructField("sc_status", IntegerType(), True), \ StructField("sc_bytes", IntegerType(), True), \ StructField("time_taken", StringType(), True), \ StructField("cs_referer", StringType(), True), \ StructField("cs_useragent", StringType(), True), \ StructField("cs_cookie", StringType(), True)]) df = sqlContext.read \ .format('com.databricks.spark.csv') \ .options(header='false', delimiter='\t') \ .load('/data/weblogs/*.dat', schema = customSchema) df.saveAsTable("weblogs_parsed_spark")
Created on 08-22-2016 10:00 PM
- Mark as Read
- Mark as New
- Bookmark
- Permalink
- Report Inappropriate Content
I am curious how https://pig.apache.org/docs/r0.9.1/api/org/apache/pig/piggybank/storage/CSVExcelStorage.html would compare to the Pig RegEx approach.
Created on 08-23-2016 05:57 AM
- Mark as Read
- Mark as New
- Bookmark
- Permalink
- Report Inappropriate Content
Thanks for the performance evaluation! Any explanation why Databricks CSV library's performance performs well?
Created on 09-05-2016 08:10 PM
- Mark as Read
- Mark as New
- Bookmark
- Permalink
- Report Inappropriate Content
The Databricks CSV library skips using Core Spark. The map function in Pyspark is run through a Python subprocess on each executor. When using Spark SQL with Databricks CSV library, everything goes through the catalyst optimizer and the output is java byte code. Scala/Java is about 40% faster than Python when using core Spark. I would guess that is the reason the 2nd implementation is much faster. The CSV library probably is much more efficient at breaking up the records, probably applying the split partition by partition as opposed to record by record.