Created on 05-26-2016 08:58 PM - edited on 02-25-2020 04:25 AM by VidyaSargur
Democratization of Big Data processing is here with Spark and Data Frames. Those days of writing complex parsers to parse and store weblogs data are over.
You can easily use Spark's in-memory processing capabilities to quickly ingest and parse weblogs data.
For this example, I downloaded some Omniture weblogs data from Adobe for a fictional company:
The code is at the end of this article. Before showing the code, I have some screenshots from Zeppelin that I took while doing this exercise to showcase Zeppelin's UI to quickly develop, analyze, and visualize your work.
The code is a basic PySpark script to get you started with parsing text files and using Spark with Data Frames
#!/usr/bin/env python # -*- coding: utf-8 -*- import sys import os from pyspark.sql import * from pyspark import SparkConf, SparkContext, SQLContext from pyspark.sql import HiveContext from pyspark.sql.types import * ## write snappy compressed output conf = (SparkConf() .setAppName("parse_weblogs") .set("spark.dynamicAllocation.enabled", "false") .set("spark.executor.cores", 4) .set("spark.executor.instances", 20) .set("spark.sql.parquet.compression.codec", "snappy") .set("spark.shuffle.compress", "true") .set("spark.io.compression.codec", "snappy")) sc = SparkContext(conf = conf) sqlContext = SQLContext(sc) ## read text file and parse out fields needed ## file is tab delimited path = "hdfs://ip-000-00-0-00.xxxx.xxxx.internal:8020/landing/weblogs/*" lines = sc.textFile(path) parts = lines.map(lambda l: l.split("\t")) weblogs_hit = parts.map(lambda p: Row(hit_timestamp=p[1], swid=p[13], ip_address=p[7], url=p[12], user_agent=p[43], city=p[49], country = p[50], state = p[52])) ## create a Data Frame from the fields we parsed schema_weblogs_hit = sqlContext.createDataFrame(weblogs_hit) ## register Data Frame as a temporary table schema_weblogs_hit.registerTempTable("weblogs_hit") ## do some basic formatting and convert some values to uppercase rows = sqlContext.sql("SELECT hit_timestamp, swid, ip_address, url, user_agent, UPPER(city) AS city, UPPER(country) AS country, UPPER(state) AS state from weblogs_hit") ## write to 1 parquet file rows.coalesce(1).write.mode('overwrite').format("parquet").save("/data/weblogs_parsed_parquet")
Created on 09-19-2017 07:23 AM
Hi,
Could any one help me to get the Column Names/ Headers (177 columns) for the downloaded Omniture weblogs data from Adobe for a fictional company and also the link/URL to download the web logs. This would help a lot.
Thanks in Advance.