- Subscribe to RSS Feed
- Mark as New
- Mark as Read
- Bookmark
- Subscribe
- Printer Friendly Page
- Report Inappropriate Content
Created on
05-26-2016
08:58 PM
- edited on
02-25-2020
04:25 AM
by
VidyaSargur
SYNOPSIS
Democratization of Big Data processing is here with Spark and Data Frames. Those days of writing complex parsers to parse and store weblogs data are over.
You can easily use Spark's in-memory processing capabilities to quickly ingest and parse weblogs data.
For this example, I downloaded some Omniture weblogs data from Adobe for a fictional company:
- The data consists of 421,266 records across 5 files which I put into HDFS
- Each record could contain up to 178 columns
- These are tab delimited text files
- Use Spark with Data Frames via PySpark to parse out the fields we need and output into new Parquet file
- Build an External Hive table over this Parquet file so analysts can easily query the data
The code is at the end of this article. Before showing the code, I have some screenshots from Zeppelin that I took while doing this exercise to showcase Zeppelin's UI to quickly develop, analyze, and visualize your work.
The code is a basic PySpark script to get you started with parsing text files and using Spark with Data Frames
Zeppelin screenshot showing the record count across all the raw and unparsed files:
Zeppelin screenshot showing a sample record from the raw and unparsed files:
Use Zeppelin query visualization features to show top 5 most visited web pages from parsed data:
Use Zeppelin query visualization features to show the distribution of page view counts by hour from parsed data:
PySpark code:
#!/usr/bin/env python # -*- coding: utf-8 -*- import sys import os from pyspark.sql import * from pyspark import SparkConf, SparkContext, SQLContext from pyspark.sql import HiveContext from pyspark.sql.types import * ## write snappy compressed output conf = (SparkConf() .setAppName("parse_weblogs") .set("spark.dynamicAllocation.enabled", "false") .set("spark.executor.cores", 4) .set("spark.executor.instances", 20) .set("spark.sql.parquet.compression.codec", "snappy") .set("spark.shuffle.compress", "true") .set("spark.io.compression.codec", "snappy")) sc = SparkContext(conf = conf) sqlContext = SQLContext(sc) ## read text file and parse out fields needed ## file is tab delimited path = "hdfs://ip-000-00-0-00.xxxx.xxxx.internal:8020/landing/weblogs/*" lines = sc.textFile(path) parts = lines.map(lambda l: l.split("\t")) weblogs_hit = parts.map(lambda p: Row(hit_timestamp=p[1], swid=p[13], ip_address=p[7], url=p[12], user_agent=p[43], city=p[49], country = p[50], state = p[52])) ## create a Data Frame from the fields we parsed schema_weblogs_hit = sqlContext.createDataFrame(weblogs_hit) ## register Data Frame as a temporary table schema_weblogs_hit.registerTempTable("weblogs_hit") ## do some basic formatting and convert some values to uppercase rows = sqlContext.sql("SELECT hit_timestamp, swid, ip_address, url, user_agent, UPPER(city) AS city, UPPER(country) AS country, UPPER(state) AS state from weblogs_hit") ## write to 1 parquet file rows.coalesce(1).write.mode('overwrite').format("parquet").save("/data/weblogs_parsed_parquet")
Created on 09-19-2017 07:23 AM
- Mark as Read
- Mark as New
- Bookmark
- Permalink
- Report Inappropriate Content
Hi,
Could any one help me to get the Column Names/ Headers (177 columns) for the downloaded Omniture weblogs data from Adobe for a fictional company and also the link/URL to download the web logs. This would help a lot.
Thanks in Advance.