Community Articles

Find and share helpful community-sourced technical articles.
Labels (3)
avatar

Synopsis

Both Pig and Spark (PySpark) excel at iterative data processing against weblogs data in text delimited format.

Is one faster than the other?

  • Pig on Tez shows to outperform PySpark
  • PySpark using the Databricks CSV parser shows to outperform Pig on Tez

Pig

  • Apache Pig allows users to MapReduce transformations using a simple scripting language called Pig Latin.
  • Pig translates the Pig Latin script into MapReduce so that it can be executed within YARN against a dataset stored in the Hadoop Distributed File System (HDFS)
  • Apache Tez is a distributed data processing framework for building batch and interactive applications.
  • Tez improves the MapReduce paradigm by dramatically improving its speed, while maintaining MapReduce’s ability to scale to petabytes of data.
  • Pig on Tez excels at iterative data processing. It is fast, easy, and the optimal solution to implement when it comes to parsing weblogs.

Spark

  • General purpose in-memory distributed data processing engine
  • API’s to write code in Java, Scala, or Python
  • Libraries for SQL, Streaming, Machine Learning, and Graph Processing
  • Excellent for iterative data processing and a good choice for parsing weblogs

Test

  • Working with a customer who is parsing tab delimited text weblogs data consisting of 11 fields. Sample record:

    2015-02-12 15:00:26 198.102.62.250 GET /origin-services.failover.arcgisonline.com.akadns.net/ArcGIS/rest/services/World_Street_Map/MapServer/tile/6.386012949206202/50/25 200 2649 0 "http://www.DOMAIN.com" "Mozilla/5.0 (Linux; U; Android 4.1.2; es-us; GT-N8010 Build/JZO54K) AppleWebKit/534.30 (KHTML, like Gecko) Version/4.0 Safari/534.30" -

  • Their pig script is using Regular Expressions to parse out the individual fields
  • Wrote a PySpark script to parse out the weblogs data
  • Wrote another PySpark script using the Databricks CSV parsing library to parse out he weblogs data

Test Results

Data Volume

Hive on TezPySparkPySpark using Databricks CSV parsing library

75 Million records across 15 files in HDFS totaling 21.5 GB in size

1mins, 51 sec2mins, 13 sec 1mins, 21 sec

150 Million records across 30 files in HDFS totaling 43.0 G in size

2mins, 53 sec 3mins, 43 sec 2mins, 2sec

Conclusion

  • Pig on Tez wins over PySpark. Native out of the box functionality.
  • PySpark using the Databricks CSV parsing library beats Pig on Tez; however, this is not native functionality and requires the overhead of maintaining an external JAR file developed by a third party.

Code

Pig, PySpark, and PySpark using Databricks CSV parsing library are all below.

Pig

TEXT_LOGS = load '/data/weblogs/*.dat' using TextLoader AS (line:chararray);
set job.name 'parse_weblogs_pig'
VALID_LOGS = FILTER TEXT_LOGS by line matches '^\\s*\\w.*';
LOGS = FOREACH VALID_LOGS GENERATE
    FLATTEN (
        REGEX_EXTRACT_ALL(line, '^(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+"([^"]*)"\\s+([-]|"[^"]*")\\s+"?([^"]*)"?')
    )
    AS (
        event_date:     chararray,
        event_time:     chararray,
        cs_ip:          chararray,
        cs_method:      chararray,
        cs_uri:         chararray,
        sc_status:      chararray,
        sc_bytes:       chararray,
        time_taken:     chararray,
        cs_referer:     chararray,
        cs_useragent:   chararray,
        cs_cookie:      chararray
    );
TLOGS = FOREACH LOGS GENERATE
    event_date,
    event_time,
    cs_ip,
    cs_method,
    cs_uri,
    (int)sc_status,
    (int)sc_bytes,
    time_taken,
    cs_referer,
    cs_useragent,
    cs_cookie
    ;
STORE TLOGS into 'weblogs_data_parsed_pig' USING org.apache.hive.hcatalog.pig.HCatStorer();

PySpark

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import sys
import os
from pyspark.sql import *
from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.sql.types import *

## write to snappy compressed output
conf = (SparkConf()
         .setAppName("parse_weblogs_spark")
         .set("spark.dynamicAllocation.enabled", "false")
         .set("spark.sql.parquet.compression.codec", "uncompressed")
         .set("spark.shuffle.compress", "true")
         .set("spark.io.compression.codec", "snappy")
         .set("spark.executor.instances", "60")
         .set("spark.executor.cores", 10)
         .set("spark.executor.memory", "20g"))
sc = SparkContext(conf = conf)

sqlContext = HiveContext(sc)

## read text file
## and parse out fields needed
## file is tab delimited
path = "/data/weblogs/*.dat"
lines = sc.textFile(path)
parts = lines.map(lambda l: l.split("\t"))
weblog_hits = parts.map(lambda o: Row(event_date=o[0], event_time=o[1], cs_ip=o[2], cs_method=o[3], cs_uri=o[4],sc_status=o[5],sc_bytes=o[6],time_taken=o[7],cs_referer=o[8],cs_useragent=o[9],cs_cookie=o[10]))


schemaHits = sqlContext.createDataFrame(weblog_hits)
schemaHits.write.format("parquet").save("/tmp/parquet_query_output")

PySpark with Databricks CSV parsing library

You will need to get the JAR file containing the CSV parsing library: https://github.com/databricks/spark-csv

When you submit the spark job, pass in the package:

$SPARK_HOME/bin/spark-submit --master yarn-client --packages com.databricks:spark-csv_2.11:1.4.0 my_python_script.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import sys
import os
from pyspark.sql import *
from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.sql.types import *


conf = (SparkConf()
         .setAppName("parse_weblogs_spark")
         .set("spark.dynamicAllocation.enabled", "false")
         .set("spark.sql.parquet.compression.codec", "uncompressed")
         .set("spark.shuffle.compress", "true")
         .set("spark.io.compression.codec", "snappy")
         .set("spark.executor.instances", "60")
         .set("spark.executor.cores", 10)
         .set("spark.executor.memory", "20g"))
sc = SparkContext(conf = conf)

sqlContext = HiveContext(sc)

customSchema = StructType([ \
    StructField("event_date", StringType(), True), \
    StructField("event_time", StringType(), True), \
    StructField("cs_ip", StringType(), True), \
    StructField("cs_method", StringType(), True), \
    StructField("cs_uri", StringType(), True), \
    StructField("sc_status", IntegerType(), True), \
    StructField("sc_bytes", IntegerType(), True), \
    StructField("time_taken", StringType(), True), \
    StructField("cs_referer", StringType(), True), \
    StructField("cs_useragent", StringType(), True), \
    StructField("cs_cookie", StringType(), True)])


df = sqlContext.read \
    .format('com.databricks.spark.csv') \
    .options(header='false', delimiter='\t') \
    .load('/data/weblogs/*.dat', schema = customSchema)

df.saveAsTable("weblogs_parsed_spark")
6,913 Views
Comments
avatar
Super Guru
avatar
Contributor

Thanks for the performance evaluation! Any explanation why Databricks CSV library's performance performs well?

avatar
Super Collaborator

The Databricks CSV library skips using Core Spark. The map function in Pyspark is run through a Python subprocess on each executor. When using Spark SQL with Databricks CSV library, everything goes through the catalyst optimizer and the output is java byte code. Scala/Java is about 40% faster than Python when using core Spark. I would guess that is the reason the 2nd implementation is much faster. The CSV library probably is much more efficient at breaking up the records, probably applying the split partition by partition as opposed to record by record.