Support Questions

Find answers, ask questions, and share your expertise
Announcements
Check out our newest addition to the community, the Cloudera Data Analytics (CDA) group hub.

Flume , spark strraming with MqSQL log and data file are not in sync/consistent

Explorer

currently I am working in use case which include Flume + Spark + MySQL. My job is to read tomcat access log file using flume , process the data in spark streaming and insert those data ,in proper format, in MySql table. All are working properly but some how I found that my access log and MySql table data both are not in proper sync.

My Access Log File Data

174.37.196.221 - - [15/Sep/2017:00:06:00 +0000] "GET /cs/v1/points/bal?memberId=2164699082&accountType=10 HTTP/1.1" 200 987
174.37.196.222 - - [15/Sep/2017:00:10:00 +0000] "GET /cs/v1/points/bal?memberId=2164699082&accountType=10 HTTP/1.1" 200 987
174.37.196.223 - - [15/Sep/2017:00:11:00 +0000] "GET /cs/v1/points/bal?memberId=2164699082&accountType=10 HTTP/1.1" 200 987
174.37.196.224 - - [15/Sep/2017:00:12:00 +0000] "GET /cs/v1/points/bal?memberId=2164699082&accountType=10 HTTP/1.1" 200 987

Where as my MySql Table is as follows...

#   id_pk   ip                   requestdateTime                                    request                                        status
 ===========================================================================================================================================
1   150   174.37.196.221    [15/Sep/2017:00:06:00 +0000]     "GET /cs/v1/points/bal?memberId=2164699082&accountType=10 HTTP/1.1"     200
2   151   174.37.196.221    [15/Sep/2017:00:06:00 +0000]     "GET /cs/v1/points/bal?memberId=2164699082&accountType=10 HTTP/1.1"     200
3   148   174.37.196.222    [15/Sep/2017:00:10:00 +0000]     "GET /cs/v1/points/bal?memberId=2164699082&accountType=10 HTTP/1.1"     200
4   152   174.37.196.222    [15/Sep/2017:00:10:00 +0000]     "GET /cs/v1/points/bal?memberId=2164699082&accountType=10 HTTP/1.1"     200
5   149   174.37.196.223    [15/Sep/2017:00:11:00 +0000]     "GET /cs/v1/points/bal?memberId=2164699082&accountType=10 HTTP/1.1"     200
6   153   174.37.196.223    [15/Sep/2017:00:11:00 +0000]     "GET /cs/v1/points/bal?memberId=2164699082&accountType=10 HTTP/1.1"     200
7   154   174.37.196.224    [15/Sep/2017:00:12:00 +0000]     "GET /cs/v1/points/bal?memberId=2164699082&accountType=10 HTTP/1.1"     200

Please note the difference between log file and Table data. In table data, ip address 174.37.196.221,174.37.196.222,174.37.196.223 I found double entry where as in log file all are single entry.

My flume conf file is as follows ...

flumePullAgent.sources.nc1.type = exec
flumePullAgent.sources.nc1.command = tail -F /home/hduser/test.txt

flumePullAgent.sinks.spark.type = org.apache.spark.streaming.flume.sink.SparkSink
flumePullAgent.sinks.spark.hostname = 192.168.56.102
flumePullAgent.sinks.spark.port = 33333
flumePullAgent.sinks.spark.channel = m1

flumePullAgent.sinks.log.type=logger

flumePullAgent.sources.nc1.type = exec
flumePullAgent.sources.nc1.command = tail -F /home/hduser/test.txt

# Describe the sink
flumePullAgent.sinks.spark.type =   org.apache.spark.streaming.flume.sink.SparkSink
flumePullAgent.sinks.spark.hostname = 192.168.56.102
flumePullAgent.sinks.spark.port = 33333
flumePullAgent.sinks.spark.channel = m1

flumePullAgent.sinks.log.type=logger

flumePullAgent.channels.m1.type = memory
flumePullAgent.channels.m1.capacity = 1000
flumePullAgent.channels.m1.transactionCapacity = 100# Use a channel which    buffers events in memory
flumePullAgent.channels.m1.type = memory
flumePullAgent.channels.m1.capacity = 1000
flumePullAgent.channels.m1.transactionCapacity = 100

And my spark code is something like this

val stream = FlumeUtils.createPollingStream(ssc, host, port)
stream.count().map(cnt => "Received " + cnt + " flume events ---->>>>>>>>." ).print()

val tweets = stream.foreachRDD
{
  rdd => rdd.foreachPartition {

    it =>
      val conn = DriverManager.getConnection(MYSQL_CONNECTION_URL,MYSQL_USERNAME,MYSQL_PWD)
      val del = conn.prepareStatement("INSERT INTO requestlog (ip, requestdatetime, request, status) VALUES(?,?,?,?)")
      for (tuple <- it) {
        val strVal = new String(tuple.event.getBody.array())
        //val matcher:Matcher = patternU.matcher(strVal.getBody.array())
        println("Printing for for each RDD :"+strVal)
        val matcher:Matcher = patternU.matcher(strVal)
        if(matcher.matches())
        {
            println("Match Found ")
            val logdataObj = new logData(matcher.group(1),matcher.group(3),matcher.group(4),matcher.group(5))
            del.setString(1,logdataObj.ip)
            del.setString(2,logdataObj.requestdateTime)
            del.setString(3,logdataObj.request)
            del.setString(4,logdataObj.status)
            del.executeUpdate
        }
        else
        {
            println("No Match Found ")
        }

      }
      conn.close()
  }
}

Can any one help me out to find out the where I did the mistake? Why my log data and table data are not in sync. Is this due to "tail" command? My exception was that in table also I will have same entry with same frequency as I have in log file. In fact in that case we will be able to do proper analysis for access data of our API server.

Thanks in advance...

1 REPLY 1

New Contributor

Did you found an answer for this problem. I'm having the same problem. Please help me.

Thank you.

Take a Tour of the Community
Don't have an account?
Your experience may be limited. Sign in to explore more.