Created on 05-26-201608:58 PM - edited on 02-25-202004:25 AM by VidyaSargur
SYNOPSIS
Democratization of Big Data processing is here with Spark and Data Frames. Those days of writing complex parsers to parse and store weblogs data are over.
You can easily use Spark's in-memory processing capabilities to quickly ingest and parse weblogs data.
For this example, I downloaded some Omniture weblogs data from Adobe for a fictional company:
The data consists of 421,266 records across 5 files which I put into HDFS
Each record could contain up to 178 columns
These are tab delimited text files
Use Spark with Data Frames via PySpark to parse out the fields we need and output into new Parquet file
Build an External Hive table over this Parquet file so analysts can easily query the data
The code is at the end of this article. Before showing the code, I have some screenshots from Zeppelin that I took while doing this exercise to showcase Zeppelin's UI to quickly develop, analyze, and visualize your work.
The code is a basic PySpark script to get you started with parsing text files and using Spark with Data Frames
Zeppelin screenshot showing the record count across all the raw and unparsed files:
Zeppelin screenshot showing a sample record from the raw and unparsed files:
Use Zeppelin query visualization features to show top 5 most visited web pages from parsed data:
Use Zeppelin query visualization features to show the distribution of page view counts by hour from parsed data:
PySpark code:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys
import os
from pyspark.sql import *
from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.sql import HiveContext
from pyspark.sql.types import *
## write snappy compressed output
conf = (SparkConf()
.setAppName("parse_weblogs")
.set("spark.dynamicAllocation.enabled", "false")
.set("spark.executor.cores", 4)
.set("spark.executor.instances", 20)
.set("spark.sql.parquet.compression.codec", "snappy")
.set("spark.shuffle.compress", "true")
.set("spark.io.compression.codec", "snappy"))
sc = SparkContext(conf = conf)
sqlContext = SQLContext(sc)
## read text file and parse out fields needed
## file is tab delimited
path = "hdfs://ip-000-00-0-00.xxxx.xxxx.internal:8020/landing/weblogs/*"
lines = sc.textFile(path)
parts = lines.map(lambda l: l.split("\t"))
weblogs_hit = parts.map(lambda p: Row(hit_timestamp=p[1], swid=p[13], ip_address=p[7], url=p[12], user_agent=p[43], city=p[49], country = p[50], state = p[52]))
## create a Data Frame from the fields we parsed
schema_weblogs_hit = sqlContext.createDataFrame(weblogs_hit)
## register Data Frame as a temporary table
schema_weblogs_hit.registerTempTable("weblogs_hit")
## do some basic formatting and convert some values to uppercase
rows = sqlContext.sql("SELECT hit_timestamp, swid, ip_address, url, user_agent, UPPER(city) AS city, UPPER(country) AS country, UPPER(state) AS state from weblogs_hit")
## write to 1 parquet file
rows.coalesce(1).write.mode('overwrite').format("parquet").save("/data/weblogs_parsed_parquet")
Could any one help me to get the Column Names/ Headers (177 columns) for the downloaded Omniture weblogs data from Adobe for a fictional company and also the link/URL to download the web logs. This would help a lot.