Community Articles

Find and share helpful community-sourced technical articles.
Labels (1)
avatar
Rising Star
import re
from pyspark.sql import Row
APACHE_ACCESS_LOG_PATTERN = '^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+) (\S+)" (\d{3}) (\d+)'

# Returns a dictionary containing the parts of the Apache Access Log.
def parse_apache_log_line(logline):
    match = re.search(APACHE_ACCESS_LOG_PATTERN, logline)
    if match is None:
        # Optionally, you can change this to just ignore if each line of data is not critical.
        # For this example, we want to ensure that the format is consistent.
        raise Exception("Invalid logline: %s" % logline)
    return Row(
        ipAddress    = match.group(1),
        clientIdentd = match.group(2),
        userId       = match.group(3),
        dateTime     = match.group(4),
        method       = match.group(5),
        endpoint     = match.group(6),
        protocol     = match.group(7),
        responseCode = int(match.group(8)),
        contentSize  = long(match.group(9)))

log_files = "hdfs://dataset/apache_logs/"
raw_log_files = sc.textFile(log_files)
raw_log_files.count()

parsed_log_files = raw_log_files.map(parse_apache_log_line)
parsed_log_files.toDF().registerTempTable("log_data")


%scala
// HELPER FUNCTION - Register ParseDate UDF to use with queries later
def parseDate(rawDate:String):Long = {
  val dtParser = new java.text.SimpleDateFormat("dd/MMM/yyyy:hh:mm:ss")
  val splitted = rawDate.split(" ")
  val futureDt = splitted(0)
  val offset = splitted(1).asInstanceOf[String].toLong
  val hourOffset = (offset.toInt / 100) 
  val minuteOffset = (offset - hourOffset * 100).toInt
  val totalOffset = hourOffset * 60 * 60 + minuteOffset * 60
  (dtParser.parse(futureDt).getTime() / 1000) + totalOffset
}


val example = "21/Jun/2014:10:00:00 -0730"
parseDate(example)
sqlContext.udf.register("parseDate", parseDate(_:String):Long)


%sql
-- Used the parsed logs and the date helper UDF to execute SQL Query 
select responseCode, ipAddress, to_date(cast(parseDate(dateTime) as timestamp)) as date, count(*) as NoOfRequests, sum(contentSize) as TotalContentSize
from log_data
group by responseCode,ipAddress,to_date(cast(parseDate(dateTime) as timestamp)) 
order by count(*) desc
2,752 Views
0 Kudos