Community Articles

Find and share helpful community-sourced technical articles.
Announcements
Celebrating as our community reaches 100,000 members! Thank you!
Labels (1)
avatar
Master Guru

Example Log

94.158.95.124 - - [24/Feb/2016:00:11:58 -0500] "GET / HTTP/1.1" 200 91966 "http://creativelabs.biz" "Mozilla/5.0 (Windows NT 10.0; WOW64; rv:40.0) Gecko/20100101 Firefox/40.0"

SBT

name := "Logs"
version := "1.0"
scalaVersion := "2.10.6"
jarName in assembly := "Logs.jar"
libraryDependencies  += "org.apache.spark" % "spark-core_2.10" % "1.6.1" % "provided"
libraryDependencies  += "org.apache.spark" % "spark-sql_2.10" % "1.6.1" % "provided"
libraryDependencies  += "com.databricks" %% "spark-avro" % "2.0.1"

Scala Program Pieces

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.serializer.{KryoSerializer}
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.{KryoSerializer, KryoRegistrator}
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.sql.SQLContext
import com.databricks.spark.avro._

case class LogRecord( clientIp: String, clientIdentity: String, user: String, dateTime: String, request:String,statusCode:Int, bytesSent:Long, referer:String, userAgent:String )

object Logs {
    val PATTERN = """^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+) (\S+)" (\d{3}) (\S+) "(\S+)" "([^"]*)"""".r

    def parseLogLine(log: String): LogRecord = {
      try {
        val res = PATTERN.findFirstMatchIn(log)

        if (res.isEmpty) {
          println("Rejected Log Line: " + log)
          LogRecord("Empty", "-", "-", "", "",  -1, -1, "-", "-" )
        }
        else {
          val m = res.get
          // NOTE:   HEAD does not have a content size.
          if (m.group(9).equals("-")) {
            LogRecord(m.group(1), m.group(2), m.group(3), m.group(4),
              m.group(5), m.group(8).toInt, 0, m.group(10), m.group(11))
          }
          else {
            LogRecord(m.group(1), m.group(2), m.group(3), m.group(4),
              m.group(5), m.group(8).toInt, m.group(9).toLong, m.group(10), m.group(11))
          }
        }
      } catch
      {
        case e: Exception =>
          println("Exception on line:" + log + ":" + e.getMessage);
          LogRecord("Empty", "-", "-", "", "-", -1, -1, "-", "-" )
      }
    }

  //// Main Spark Program
  def main(args: Array[String]) {
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.apache.spark.storage.BlockManager").setLevel(Level.ERROR)
    Logger.getLogger("com.hortonworks.spark.Logs").setLevel(Level.INFO)

    val log = Logger.getLogger("com.hortonworks.spark.Logs")
    log.info("Started Logs Analysis")

    val sparkConf = new SparkConf().setAppName("Logs")

    sparkConf.set("spark.cores.max", "16")
    sparkConf.set("spark.serializer", classOf[KryoSerializer].getName)
    sparkConf.set("spark.sql.tungsten.enabled", "true")
    sparkConf.set("spark.eventLog.enabled", "true")
    sparkConf.set("spark.app.id", "Logs")
    sparkConf.set("spark.io.compression.codec", "snappy")
    sparkConf.set("spark.rdd.compress", "false")
    sparkConf.set("spark.suffle.compress", "true")

    val sc = new SparkContext(sparkConf)
    val logFile = sc.textFile("data/access3.log")
    val accessLogs = logFile.map(parseLogLine).filter(!_.clientIp.equals("Empty"))

    log.info("# of Partitions %s".format(accessLogs.partitions.size))

    try {
      println("===== Log Count: %s".format(accessLogs.count()))
      accessLogs.take(5).foreach(println)

      try {
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._

        val df1 = accessLogs.toDF()
        df1.registerTempTable("accessLogsDF")
        df1.printSchema()
        df1.describe("bytesSent").show()
        df1.first()
        df1.head()
        df1.explain()

        df1.write.format("avro").mode(org.apache.spark.sql.SaveMode.Append).partitionBy("statusCode").avro("avroresults")
      } catch {
        case e: Exception =>
          log.error("Writing files after job. Exception:" + e.getMessage);
          e.printStackTrace();
      }

      // Calculate statistics based on the content size.
      val contentSizes = accessLogs.map(log => log.bytesSent)
      val contentTotal = contentSizes.reduce(_ + _)

      println("===== Number of Log Records: %s  Content Size Total: %s, Avg: %s, Min: %s, Max: %s".format(
        contentSizes.count,
        contentTotal,
        contentTotal / contentSizes.count,
        contentSizes.min,
        contentSizes.max))

   sc.stop()
  }
}

First, I set up a Scala Case Class that will hold the parsed record (clientIP, clientID, User, DateTime, Request, StatusCode, BytesSent, Referer, UserAgent).

Next I have a regex and method to parse the logs (one line at a time) into case classes.

I filter out the empty records.

I use Spark SQL to examine the data.

Then I write out the data to AVRO and finally do some counts.

14,316 Views
Comments
avatar
Master Guru

You can also easily save to ORC, Parquet or another format. I recommend ORC so you can do fast queries from Hive.

avatar
New Contributor

org.apache.spark.serializer.KryoSerializer

org.apache.spark.{SparkConf, SparkContext}

These imports are present twice.

avatar
Master Guru

oops drop the second one