Created on 05-19-2016 04:33 PM
Example Log
94.158.95.124 - - [24/Feb/2016:00:11:58 -0500] "GET / HTTP/1.1" 200 91966 "http://creativelabs.biz" "Mozilla/5.0 (Windows NT 10.0; WOW64; rv:40.0) Gecko/20100101 Firefox/40.0"
SBT
name := "Logs" version := "1.0" scalaVersion := "2.10.6" jarName in assembly := "Logs.jar" libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.6.1" % "provided" libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.6.1" % "provided" libraryDependencies += "com.databricks" %% "spark-avro" % "2.0.1"
Scala Program Pieces
import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.serializer.{KryoSerializer} import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.serializer.{KryoSerializer, KryoRegistrator} import org.apache.log4j.Logger import org.apache.log4j.Level import org.apache.spark.sql.SQLContext import com.databricks.spark.avro._ case class LogRecord( clientIp: String, clientIdentity: String, user: String, dateTime: String, request:String,statusCode:Int, bytesSent:Long, referer:String, userAgent:String ) object Logs { val PATTERN = """^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+) (\S+)" (\d{3}) (\S+) "(\S+)" "([^"]*)"""".r def parseLogLine(log: String): LogRecord = { try { val res = PATTERN.findFirstMatchIn(log) if (res.isEmpty) { println("Rejected Log Line: " + log) LogRecord("Empty", "-", "-", "", "", -1, -1, "-", "-" ) } else { val m = res.get // NOTE: HEAD does not have a content size. if (m.group(9).equals("-")) { LogRecord(m.group(1), m.group(2), m.group(3), m.group(4), m.group(5), m.group(8).toInt, 0, m.group(10), m.group(11)) } else { LogRecord(m.group(1), m.group(2), m.group(3), m.group(4), m.group(5), m.group(8).toInt, m.group(9).toLong, m.group(10), m.group(11)) } } } catch { case e: Exception => println("Exception on line:" + log + ":" + e.getMessage); LogRecord("Empty", "-", "-", "", "-", -1, -1, "-", "-" ) } } //// Main Spark Program def main(args: Array[String]) { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.apache.spark.storage.BlockManager").setLevel(Level.ERROR) Logger.getLogger("com.hortonworks.spark.Logs").setLevel(Level.INFO) val log = Logger.getLogger("com.hortonworks.spark.Logs") log.info("Started Logs Analysis") val sparkConf = new SparkConf().setAppName("Logs") sparkConf.set("spark.cores.max", "16") sparkConf.set("spark.serializer", classOf[KryoSerializer].getName) sparkConf.set("spark.sql.tungsten.enabled", "true") sparkConf.set("spark.eventLog.enabled", "true") sparkConf.set("spark.app.id", "Logs") sparkConf.set("spark.io.compression.codec", "snappy") sparkConf.set("spark.rdd.compress", "false") sparkConf.set("spark.suffle.compress", "true") val sc = new SparkContext(sparkConf) val logFile = sc.textFile("data/access3.log") val accessLogs = logFile.map(parseLogLine).filter(!_.clientIp.equals("Empty")) log.info("# of Partitions %s".format(accessLogs.partitions.size)) try { println("===== Log Count: %s".format(accessLogs.count())) accessLogs.take(5).foreach(println) try { val sqlContext = new SQLContext(sc) import sqlContext.implicits._ val df1 = accessLogs.toDF() df1.registerTempTable("accessLogsDF") df1.printSchema() df1.describe("bytesSent").show() df1.first() df1.head() df1.explain() df1.write.format("avro").mode(org.apache.spark.sql.SaveMode.Append).partitionBy("statusCode").avro("avroresults") } catch { case e: Exception => log.error("Writing files after job. Exception:" + e.getMessage); e.printStackTrace(); } // Calculate statistics based on the content size. val contentSizes = accessLogs.map(log => log.bytesSent) val contentTotal = contentSizes.reduce(_ + _) println("===== Number of Log Records: %s Content Size Total: %s, Avg: %s, Min: %s, Max: %s".format( contentSizes.count, contentTotal, contentTotal / contentSizes.count, contentSizes.min, contentSizes.max)) sc.stop() } }
First, I set up a Scala Case Class that will hold the parsed record (clientIP, clientID, User, DateTime, Request, StatusCode, BytesSent, Referer, UserAgent).
Next I have a regex and method to parse the logs (one line at a time) into case classes.
I filter out the empty records.
I use Spark SQL to examine the data.
Then I write out the data to AVRO and finally do some counts.
Created on 08-23-2016 03:56 PM
You can also easily save to ORC, Parquet or another format. I recommend ORC so you can do fast queries from Hive.
Created on 12-21-2016 07:29 PM
Created on 10-05-2017 10:21 AM
org.apache.spark.serializer.KryoSerializer
org.apache.spark.{SparkConf, SparkContext}
These imports are present twice.
Created on 11-28-2017 12:32 AM
oops drop the second one