Created on 05-19-2016 04:33 PM
Example Log
94.158.95.124 - - [24/Feb/2016:00:11:58 -0500] "GET / HTTP/1.1" 200 91966 "http://creativelabs.biz" "Mozilla/5.0 (Windows NT 10.0; WOW64; rv:40.0) Gecko/20100101 Firefox/40.0"
SBT
name := "Logs" version := "1.0" scalaVersion := "2.10.6" jarName in assembly := "Logs.jar" libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.6.1" % "provided" libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.6.1" % "provided" libraryDependencies += "com.databricks" %% "spark-avro" % "2.0.1"
Scala Program Pieces
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.serializer.{KryoSerializer}
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.{KryoSerializer, KryoRegistrator}
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.sql.SQLContext
import com.databricks.spark.avro._
case class LogRecord( clientIp: String, clientIdentity: String, user: String, dateTime: String, request:String,statusCode:Int, bytesSent:Long, referer:String, userAgent:String )
object Logs {
val PATTERN = """^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+) (\S+)" (\d{3}) (\S+) "(\S+)" "([^"]*)"""".r
def parseLogLine(log: String): LogRecord = {
try {
val res = PATTERN.findFirstMatchIn(log)
if (res.isEmpty) {
println("Rejected Log Line: " + log)
LogRecord("Empty", "-", "-", "", "", -1, -1, "-", "-" )
}
else {
val m = res.get
// NOTE: HEAD does not have a content size.
if (m.group(9).equals("-")) {
LogRecord(m.group(1), m.group(2), m.group(3), m.group(4),
m.group(5), m.group(8).toInt, 0, m.group(10), m.group(11))
}
else {
LogRecord(m.group(1), m.group(2), m.group(3), m.group(4),
m.group(5), m.group(8).toInt, m.group(9).toLong, m.group(10), m.group(11))
}
}
} catch
{
case e: Exception =>
println("Exception on line:" + log + ":" + e.getMessage);
LogRecord("Empty", "-", "-", "", "-", -1, -1, "-", "-" )
}
}
//// Main Spark Program
def main(args: Array[String]) {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark.storage.BlockManager").setLevel(Level.ERROR)
Logger.getLogger("com.hortonworks.spark.Logs").setLevel(Level.INFO)
val log = Logger.getLogger("com.hortonworks.spark.Logs")
log.info("Started Logs Analysis")
val sparkConf = new SparkConf().setAppName("Logs")
sparkConf.set("spark.cores.max", "16")
sparkConf.set("spark.serializer", classOf[KryoSerializer].getName)
sparkConf.set("spark.sql.tungsten.enabled", "true")
sparkConf.set("spark.eventLog.enabled", "true")
sparkConf.set("spark.app.id", "Logs")
sparkConf.set("spark.io.compression.codec", "snappy")
sparkConf.set("spark.rdd.compress", "false")
sparkConf.set("spark.suffle.compress", "true")
val sc = new SparkContext(sparkConf)
val logFile = sc.textFile("data/access3.log")
val accessLogs = logFile.map(parseLogLine).filter(!_.clientIp.equals("Empty"))
log.info("# of Partitions %s".format(accessLogs.partitions.size))
try {
println("===== Log Count: %s".format(accessLogs.count()))
accessLogs.take(5).foreach(println)
try {
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val df1 = accessLogs.toDF()
df1.registerTempTable("accessLogsDF")
df1.printSchema()
df1.describe("bytesSent").show()
df1.first()
df1.head()
df1.explain()
df1.write.format("avro").mode(org.apache.spark.sql.SaveMode.Append).partitionBy("statusCode").avro("avroresults")
} catch {
case e: Exception =>
log.error("Writing files after job. Exception:" + e.getMessage);
e.printStackTrace();
}
// Calculate statistics based on the content size.
val contentSizes = accessLogs.map(log => log.bytesSent)
val contentTotal = contentSizes.reduce(_ + _)
println("===== Number of Log Records: %s Content Size Total: %s, Avg: %s, Min: %s, Max: %s".format(
contentSizes.count,
contentTotal,
contentTotal / contentSizes.count,
contentSizes.min,
contentSizes.max))
sc.stop()
}
}
First, I set up a Scala Case Class that will hold the parsed record (clientIP, clientID, User, DateTime, Request, StatusCode, BytesSent, Referer, UserAgent).
Next I have a regex and method to parse the logs (one line at a time) into case classes.
I filter out the empty records.
I use Spark SQL to examine the data.
Then I write out the data to AVRO and finally do some counts.
Created on 08-23-2016 03:56 PM
You can also easily save to ORC, Parquet or another format. I recommend ORC so you can do fast queries from Hive.
Created on 12-21-2016 07:29 PM
Created on 10-05-2017 10:21 AM
org.apache.spark.serializer.KryoSerializer
org.apache.spark.{SparkConf, SparkContext}
These imports are present twice.
Created on 11-28-2017 12:32 AM
oops drop the second one