import re
from pyspark.sql import Row
APACHE_ACCESS_LOG_PATTERN = '^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+) (\S+)" (\d{3}) (\d+)'

# Returns a dictionary containing the parts of the Apache Access Log.
def parse_apache_log_line(logline):
    match =, logline)
    if match is None:
        # Optionally, you can change this to just ignore if each line of data is not critical.
        # For this example, we want to ensure that the format is consistent.
        raise Exception("Invalid logline: %s" % logline)
    return Row(
        ipAddress    =,
        clientIdentd =,
        userId       =,
        dateTime     =,
        method       =,
        endpoint     =,
        protocol     =,
        responseCode = int(,
        contentSize  = long(

log_files = "hdfs://dataset/apache_logs/"
raw_log_files = sc.textFile(log_files)

parsed_log_files =

// HELPER FUNCTION - Register ParseDate UDF to use with queries later
def parseDate(rawDate:String):Long = {
  val dtParser = new java.text.SimpleDateFormat("dd/MMM/yyyy:hh:mm:ss")
  val splitted = rawDate.split(" ")
  val futureDt = splitted(0)
  val offset = splitted(1).asInstanceOf[String].toLong
  val hourOffset = (offset.toInt / 100) 
  val minuteOffset = (offset - hourOffset * 100).toInt
  val totalOffset = hourOffset * 60 * 60 + minuteOffset * 60
  (dtParser.parse(futureDt).getTime() / 1000) + totalOffset

val example = "21/Jun/2014:10:00:00 -0730"
sqlContext.udf.register("parseDate", parseDate(_:String):Long)

-- Used the parsed logs and the date helper UDF to execute SQL Query 
select responseCode, ipAddress, to_date(cast(parseDate(dateTime) as timestamp)) as date, count(*) as NoOfRequests, sum(contentSize) as TotalContentSize
from log_data
group by responseCode,ipAddress,to_date(cast(parseDate(dateTime) as timestamp)) 
order by count(*) desc
