Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Flume , spark strraming with MqSQL log and data file are not in sync/consistent

Highlighted

Flume , spark strraming with MqSQL log and data file are not in sync/consistent

New Contributor

currently I am working in use case which include Flume + Spark + MySQL. My job is to read tomcat access log file using flume , process the data in spark streaming and insert those data ,in proper format, in MySql table. All are working properly but some how I found that my access log and MySql table data both are not in proper sync.

My Access Log File Data

174.37.196.221 - - [15/Sep/2017:00:06:00 +0000] "GET /cs/v1/points/bal?memberId=2164699082&accountType=10 HTTP/1.1" 200 987
174.37.196.222 - - [15/Sep/2017:00:10:00 +0000] "GET /cs/v1/points/bal?memberId=2164699082&accountType=10 HTTP/1.1" 200 987
174.37.196.223 - - [15/Sep/2017:00:11:00 +0000] "GET /cs/v1/points/bal?memberId=2164699082&accountType=10 HTTP/1.1" 200 987
174.37.196.224 - - [15/Sep/2017:00:12:00 +0000] "GET /cs/v1/points/bal?memberId=2164699082&accountType=10 HTTP/1.1" 200 987

Where as my MySql Table is as follows...

#   id_pk   ip                   requestdateTime                                    request                                        status
 ===========================================================================================================================================
1   150   174.37.196.221    [15/Sep/2017:00:06:00 +0000]     "GET /cs/v1/points/bal?memberId=2164699082&accountType=10 HTTP/1.1"     200
2   151   174.37.196.221    [15/Sep/2017:00:06:00 +0000]     "GET /cs/v1/points/bal?memberId=2164699082&accountType=10 HTTP/1.1"     200
3   148   174.37.196.222    [15/Sep/2017:00:10:00 +0000]     "GET /cs/v1/points/bal?memberId=2164699082&accountType=10 HTTP/1.1"     200
4   152   174.37.196.222    [15/Sep/2017:00:10:00 +0000]     "GET /cs/v1/points/bal?memberId=2164699082&accountType=10 HTTP/1.1"     200
5   149   174.37.196.223    [15/Sep/2017:00:11:00 +0000]     "GET /cs/v1/points/bal?memberId=2164699082&accountType=10 HTTP/1.1"     200
6   153   174.37.196.223    [15/Sep/2017:00:11:00 +0000]     "GET /cs/v1/points/bal?memberId=2164699082&accountType=10 HTTP/1.1"     200
7   154   174.37.196.224    [15/Sep/2017:00:12:00 +0000]     "GET /cs/v1/points/bal?memberId=2164699082&accountType=10 HTTP/1.1"     200

Please note the difference between log file and Table data. In table data, ip address 174.37.196.221,174.37.196.222,174.37.196.223 I found double entry where as in log file all are single entry.

My flume conf file is as follows ...

flumePullAgent.sources.nc1.type = exec
flumePullAgent.sources.nc1.command = tail -F /home/hduser/test.txt

flumePullAgent.sinks.spark.type = org.apache.spark.streaming.flume.sink.SparkSink
flumePullAgent.sinks.spark.hostname = 192.168.56.102
flumePullAgent.sinks.spark.port = 33333
flumePullAgent.sinks.spark.channel = m1

flumePullAgent.sinks.log.type=logger

flumePullAgent.sources.nc1.type = exec
flumePullAgent.sources.nc1.command = tail -F /home/hduser/test.txt

# Describe the sink
flumePullAgent.sinks.spark.type =   org.apache.spark.streaming.flume.sink.SparkSink
flumePullAgent.sinks.spark.hostname = 192.168.56.102
flumePullAgent.sinks.spark.port = 33333
flumePullAgent.sinks.spark.channel = m1

flumePullAgent.sinks.log.type=logger

flumePullAgent.channels.m1.type = memory
flumePullAgent.channels.m1.capacity = 1000
flumePullAgent.channels.m1.transactionCapacity = 100# Use a channel which    buffers events in memory
flumePullAgent.channels.m1.type = memory
flumePullAgent.channels.m1.capacity = 1000
flumePullAgent.channels.m1.transactionCapacity = 100

And my spark code is something like this

val stream = FlumeUtils.createPollingStream(ssc, host, port)
stream.count().map(cnt => "Received " + cnt + " flume events ---->>>>>>>>." ).print()

val tweets = stream.foreachRDD
{
  rdd => rdd.foreachPartition {

    it =>
      val conn = DriverManager.getConnection(MYSQL_CONNECTION_URL,MYSQL_USERNAME,MYSQL_PWD)
      val del = conn.prepareStatement("INSERT INTO requestlog (ip, requestdatetime, request, status) VALUES(?,?,?,?)")
      for (tuple <- it) {
        val strVal = new String(tuple.event.getBody.array())
        //val matcher:Matcher = patternU.matcher(strVal.getBody.array())
        println("Printing for for each RDD :"+strVal)
        val matcher:Matcher = patternU.matcher(strVal)
        if(matcher.matches())
        {
            println("Match Found ")
            val logdataObj = new logData(matcher.group(1),matcher.group(3),matcher.group(4),matcher.group(5))
            del.setString(1,logdataObj.ip)
            del.setString(2,logdataObj.requestdateTime)
            del.setString(3,logdataObj.request)
            del.setString(4,logdataObj.status)
            del.executeUpdate
        }
        else
        {
            println("No Match Found ")
        }

      }
      conn.close()
  }
}

Can any one help me out to find out the where I did the mistake? Why my log data and table data are not in sync. Is this due to "tail" command? My exception was that in table also I will have same entry with same frequency as I have in log file. In fact in that case we will be able to do proper analysis for access data of our API server.

Thanks in advance...

1 REPLY 1

Re: Flume , spark strraming with MqSQL log and data file are not in sync/consistent

New Contributor

Did you found an answer for this problem. I'm having the same problem. Please help me.

Thank you.

Don't have an account?
Coming from Hortonworks? Activate your account here