Community Articles

Find and share helpful community-sourced technical articles.
Labels (2)
avatar

SYNOPSIS

Democratization of Big Data processing is here with Spark and Data Frames. Those days of writing complex parsers to parse and store weblogs data are over.

You can easily use Spark's in-memory processing capabilities to quickly ingest and parse weblogs data.

For this example, I downloaded some Omniture weblogs data from Adobe for a fictional company:

  • The data consists of 421,266 records across 5 files which I put into HDFS
  • Each record could contain up to 178 columns
  • These are tab delimited text files
  • Use Spark with Data Frames via PySpark to parse out the fields we need and output into new Parquet file
  • Build an External Hive table over this Parquet file so analysts can easily query the data

The code is at the end of this article. Before showing the code, I have some screenshots from Zeppelin that I took while doing this exercise to showcase Zeppelin's UI to quickly develop, analyze, and visualize your work.

The code is a basic PySpark script to get you started with parsing text files and using Spark with Data Frames

Zeppelin screenshot showing the record count across all the raw and unparsed files:

246280_1_new.png

Zeppelin screenshot showing a sample record from the raw and unparsed files:

246280_2_new.png

Use Zeppelin query visualization features to show top 5 most visited web pages from parsed data:

4561-screen-shot-2016-05-26-at-122526-am.png

Use Zeppelin query visualization features to show the distribution of page view counts by hour from parsed data:

4560-screen-shot-2016-05-26-at-122037-am.png

PySpark code:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import sys
import os
from pyspark.sql import *
from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.sql import HiveContext
from pyspark.sql.types import *

## write snappy compressed output
conf = (SparkConf()
        .setAppName("parse_weblogs")
        .set("spark.dynamicAllocation.enabled", "false")
 .set("spark.executor.cores", 4)
 .set("spark.executor.instances", 20)
 .set("spark.sql.parquet.compression.codec", "snappy")
 .set("spark.shuffle.compress", "true")
 .set("spark.io.compression.codec", "snappy"))
sc = SparkContext(conf = conf)

sqlContext = SQLContext(sc)

## read text file and parse out fields needed
## file is tab delimited
path = "hdfs://ip-000-00-0-00.xxxx.xxxx.internal:8020/landing/weblogs/*"
lines = sc.textFile(path)
parts = lines.map(lambda l: l.split("\t"))
weblogs_hit = parts.map(lambda p: Row(hit_timestamp=p[1], swid=p[13], ip_address=p[7], url=p[12], user_agent=p[43], city=p[49], country = p[50], state = p[52]))

## create a Data Frame from the fields we parsed
schema_weblogs_hit = sqlContext.createDataFrame(weblogs_hit)

## register Data Frame as a temporary table
schema_weblogs_hit.registerTempTable("weblogs_hit")

## do some basic formatting and convert some values to uppercase
rows = sqlContext.sql("SELECT hit_timestamp, swid, ip_address, url, user_agent, UPPER(city) AS city, UPPER(country) AS country, UPPER(state) AS state from weblogs_hit")

## write to 1 parquet file
rows.coalesce(1).write.mode('overwrite').format("parquet").save("/data/weblogs_parsed_parquet")


screen-shot-2016-05-26-at-121911-am.png

10,085 Views
Comments

Hi,

Could any one help me to get the Column Names/ Headers (177 columns) for the downloaded Omniture weblogs data from Adobe for a fictional company and also the link/URL to download the web logs. This would help a lot.

Thanks in Advance.