import re
from pyspark.sql import Row
APACHE_ACCESS_LOG_PATTERN = '^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+) (\S+)" (\d{3}) (\d+)'
# Returns a dictionary containing the parts of the Apache Access Log.
def parse_apache_log_line(logline):
match = re.search(APACHE_ACCESS_LOG_PATTERN, logline)
if match is None:
# Optionally, you can change this to just ignore if each line of data is not critical.
# For this example, we want to ensure that the format is consistent.
raise Exception("Invalid logline: %s" % logline)
return Row(
ipAddress = match.group(1),
clientIdentd = match.group(2),
userId = match.group(3),
dateTime = match.group(4),
method = match.group(5),
endpoint = match.group(6),
protocol = match.group(7),
responseCode = int(match.group(8)),
contentSize = long(match.group(9)))
log_files = "hdfs://dataset/apache_logs/"
raw_log_files = sc.textFile(log_files)
raw_log_files.count()
parsed_log_files = raw_log_files.map(parse_apache_log_line)
parsed_log_files.toDF().registerTempTable("log_data")
%scala
// HELPER FUNCTION - Register ParseDate UDF to use with queries later
def parseDate(rawDate:String):Long = {
val dtParser = new java.text.SimpleDateFormat("dd/MMM/yyyy:hh:mm:ss")
val splitted = rawDate.split(" ")
val futureDt = splitted(0)
val offset = splitted(1).asInstanceOf[String].toLong
val hourOffset = (offset.toInt / 100)
val minuteOffset = (offset - hourOffset * 100).toInt
val totalOffset = hourOffset * 60 * 60 + minuteOffset * 60
(dtParser.parse(futureDt).getTime() / 1000) + totalOffset
}
val example = "21/Jun/2014:10:00:00 -0730"
parseDate(example)
sqlContext.udf.register("parseDate", parseDate(_:String):Long)
%sql
-- Used the parsed logs and the date helper UDF to execute SQL Query
select responseCode, ipAddress, to_date(cast(parseDate(dateTime) as timestamp)) as date, count(*) as NoOfRequests, sum(contentSize) as TotalContentSize
from log_data
group by responseCode,ipAddress,to_date(cast(parseDate(dateTime) as timestamp))
order by count(*) desc