Options
- Subscribe to RSS Feed
- Mark as New
- Mark as Read
- Bookmark
- Subscribe
- Printer Friendly Page
- Report Inappropriate Content
Rising Star
Created on 06-29-2017 05:48 PM
import re from pyspark.sql import Row APACHE_ACCESS_LOG_PATTERN = '^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+) (\S+)" (\d{3}) (\d+)' # Returns a dictionary containing the parts of the Apache Access Log. def parse_apache_log_line(logline): match = re.search(APACHE_ACCESS_LOG_PATTERN, logline) if match is None: # Optionally, you can change this to just ignore if each line of data is not critical. # For this example, we want to ensure that the format is consistent. raise Exception("Invalid logline: %s" % logline) return Row( ipAddress = match.group(1), clientIdentd = match.group(2), userId = match.group(3), dateTime = match.group(4), method = match.group(5), endpoint = match.group(6), protocol = match.group(7), responseCode = int(match.group(8)), contentSize = long(match.group(9))) log_files = "hdfs://dataset/apache_logs/" raw_log_files = sc.textFile(log_files) raw_log_files.count() parsed_log_files = raw_log_files.map(parse_apache_log_line) parsed_log_files.toDF().registerTempTable("log_data") %scala // HELPER FUNCTION - Register ParseDate UDF to use with queries later def parseDate(rawDate:String):Long = { val dtParser = new java.text.SimpleDateFormat("dd/MMM/yyyy:hh:mm:ss") val splitted = rawDate.split(" ") val futureDt = splitted(0) val offset = splitted(1).asInstanceOf[String].toLong val hourOffset = (offset.toInt / 100) val minuteOffset = (offset - hourOffset * 100).toInt val totalOffset = hourOffset * 60 * 60 + minuteOffset * 60 (dtParser.parse(futureDt).getTime() / 1000) + totalOffset } val example = "21/Jun/2014:10:00:00 -0730" parseDate(example) sqlContext.udf.register("parseDate", parseDate(_:String):Long) %sql -- Used the parsed logs and the date helper UDF to execute SQL Query select responseCode, ipAddress, to_date(cast(parseDate(dateTime) as timestamp)) as date, count(*) as NoOfRequests, sum(contentSize) as TotalContentSize from log_data group by responseCode,ipAddress,to_date(cast(parseDate(dateTime) as timestamp)) order by count(*) desc