Created on 08-22-2016 08:27 PM
Both Pig and Spark (PySpark) excel at iterative data processing against weblogs data in text delimited format.
Is one faster than the other?
Pig
Spark
2015-02-12 15:00:26 198.102.62.250 GET /origin-services.failover.arcgisonline.com.akadns.net/ArcGIS/rest/services/World_Street_Map/MapServer/tile/6.386012949206202/50/25 200 2649 0 "http://www.DOMAIN.com" "Mozilla/5.0 (Linux; U; Android 4.1.2; es-us; GT-N8010 Build/JZO54K) AppleWebKit/534.30 (KHTML, like Gecko) Version/4.0 Safari/534.30" -
Data Volume | Hive on Tez | PySpark | PySpark using Databricks CSV parsing library |
75 Million records across 15 files in HDFS totaling 21.5 GB in size | 1mins, 51 sec | 2mins, 13 sec | 1mins, 21 sec |
150 Million records across 30 files in HDFS totaling 43.0 G in size | 2mins, 53 sec | 3mins, 43 sec | 2mins, 2sec |
Pig, PySpark, and PySpark using Databricks CSV parsing library are all below.
TEXT_LOGS = load '/data/weblogs/*.dat' using TextLoader AS (line:chararray); set job.name 'parse_weblogs_pig' VALID_LOGS = FILTER TEXT_LOGS by line matches '^\\s*\\w.*'; LOGS = FOREACH VALID_LOGS GENERATE FLATTEN ( REGEX_EXTRACT_ALL(line, '^(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+"([^"]*)"\\s+([-]|"[^"]*")\\s+"?([^"]*)"?') ) AS ( event_date: chararray, event_time: chararray, cs_ip: chararray, cs_method: chararray, cs_uri: chararray, sc_status: chararray, sc_bytes: chararray, time_taken: chararray, cs_referer: chararray, cs_useragent: chararray, cs_cookie: chararray ); TLOGS = FOREACH LOGS GENERATE event_date, event_time, cs_ip, cs_method, cs_uri, (int)sc_status, (int)sc_bytes, time_taken, cs_referer, cs_useragent, cs_cookie ; STORE TLOGS into 'weblogs_data_parsed_pig' USING org.apache.hive.hcatalog.pig.HCatStorer();
#!/usr/bin/env python # -*- coding: utf-8 -*- import sys import os from pyspark.sql import * from pyspark import SparkConf, SparkContext, SQLContext from pyspark.sql.types import * ## write to snappy compressed output conf = (SparkConf() .setAppName("parse_weblogs_spark") .set("spark.dynamicAllocation.enabled", "false") .set("spark.sql.parquet.compression.codec", "uncompressed") .set("spark.shuffle.compress", "true") .set("spark.io.compression.codec", "snappy") .set("spark.executor.instances", "60") .set("spark.executor.cores", 10) .set("spark.executor.memory", "20g")) sc = SparkContext(conf = conf) sqlContext = HiveContext(sc) ## read text file ## and parse out fields needed ## file is tab delimited path = "/data/weblogs/*.dat" lines = sc.textFile(path) parts = lines.map(lambda l: l.split("\t")) weblog_hits = parts.map(lambda o: Row(event_date=o[0], event_time=o[1], cs_ip=o[2], cs_method=o[3], cs_uri=o[4],sc_status=o[5],sc_bytes=o[6],time_taken=o[7],cs_referer=o[8],cs_useragent=o[9],cs_cookie=o[10])) schemaHits = sqlContext.createDataFrame(weblog_hits) schemaHits.write.format("parquet").save("/tmp/parquet_query_output")
You will need to get the JAR file containing the CSV parsing library: https://github.com/databricks/spark-csv
When you submit the spark job, pass in the package:
$SPARK_HOME/bin/spark-submit --master yarn-client --packages com.databricks:spark-csv_2.11:1.4.0 my_python_script.py
#!/usr/bin/env python # -*- coding: utf-8 -*- import sys import os from pyspark.sql import * from pyspark import SparkConf, SparkContext, SQLContext from pyspark.sql.types import * conf = (SparkConf() .setAppName("parse_weblogs_spark") .set("spark.dynamicAllocation.enabled", "false") .set("spark.sql.parquet.compression.codec", "uncompressed") .set("spark.shuffle.compress", "true") .set("spark.io.compression.codec", "snappy") .set("spark.executor.instances", "60") .set("spark.executor.cores", 10) .set("spark.executor.memory", "20g")) sc = SparkContext(conf = conf) sqlContext = HiveContext(sc) customSchema = StructType([ \ StructField("event_date", StringType(), True), \ StructField("event_time", StringType(), True), \ StructField("cs_ip", StringType(), True), \ StructField("cs_method", StringType(), True), \ StructField("cs_uri", StringType(), True), \ StructField("sc_status", IntegerType(), True), \ StructField("sc_bytes", IntegerType(), True), \ StructField("time_taken", StringType(), True), \ StructField("cs_referer", StringType(), True), \ StructField("cs_useragent", StringType(), True), \ StructField("cs_cookie", StringType(), True)]) df = sqlContext.read \ .format('com.databricks.spark.csv') \ .options(header='false', delimiter='\t') \ .load('/data/weblogs/*.dat', schema = customSchema) df.saveAsTable("weblogs_parsed_spark")
Created on 08-22-2016 10:00 PM
I am curious how https://pig.apache.org/docs/r0.9.1/api/org/apache/pig/piggybank/storage/CSVExcelStorage.html would compare to the Pig RegEx approach.
Created on 08-23-2016 05:57 AM
Thanks for the performance evaluation! Any explanation why Databricks CSV library's performance performs well?
Created on 09-05-2016 08:10 PM
The Databricks CSV library skips using Core Spark. The map function in Pyspark is run through a Python subprocess on each executor. When using Spark SQL with Databricks CSV library, everything goes through the catalyst optimizer and the output is java byte code. Scala/Java is about 40% faster than Python when using core Spark. I would guess that is the reason the 2nd implementation is much faster. The CSV library probably is much more efficient at breaking up the records, probably applying the split partition by partition as opposed to record by record.