Created 09-15-2017 02:22 AM
currently I am working in use case which include Flume + Spark + MySQL. My job is to read tomcat access log file using flume , process the data in spark streaming and insert those data ,in proper format, in MySql table. All are working properly but some how I found that my access log and MySql table data both are not in proper sync.
My Access Log File Data
174.37.196.221 - - [15/Sep/2017:00:06:00 +0000] "GET /cs/v1/points/bal?memberId=2164699082&accountType=10 HTTP/1.1" 200 987 174.37.196.222 - - [15/Sep/2017:00:10:00 +0000] "GET /cs/v1/points/bal?memberId=2164699082&accountType=10 HTTP/1.1" 200 987 174.37.196.223 - - [15/Sep/2017:00:11:00 +0000] "GET /cs/v1/points/bal?memberId=2164699082&accountType=10 HTTP/1.1" 200 987 174.37.196.224 - - [15/Sep/2017:00:12:00 +0000] "GET /cs/v1/points/bal?memberId=2164699082&accountType=10 HTTP/1.1" 200 987
Where as my MySql Table is as follows...
# id_pk ip requestdateTime request status =========================================================================================================================================== 1 150 174.37.196.221 [15/Sep/2017:00:06:00 +0000] "GET /cs/v1/points/bal?memberId=2164699082&accountType=10 HTTP/1.1" 200 2 151 174.37.196.221 [15/Sep/2017:00:06:00 +0000] "GET /cs/v1/points/bal?memberId=2164699082&accountType=10 HTTP/1.1" 200 3 148 174.37.196.222 [15/Sep/2017:00:10:00 +0000] "GET /cs/v1/points/bal?memberId=2164699082&accountType=10 HTTP/1.1" 200 4 152 174.37.196.222 [15/Sep/2017:00:10:00 +0000] "GET /cs/v1/points/bal?memberId=2164699082&accountType=10 HTTP/1.1" 200 5 149 174.37.196.223 [15/Sep/2017:00:11:00 +0000] "GET /cs/v1/points/bal?memberId=2164699082&accountType=10 HTTP/1.1" 200 6 153 174.37.196.223 [15/Sep/2017:00:11:00 +0000] "GET /cs/v1/points/bal?memberId=2164699082&accountType=10 HTTP/1.1" 200 7 154 174.37.196.224 [15/Sep/2017:00:12:00 +0000] "GET /cs/v1/points/bal?memberId=2164699082&accountType=10 HTTP/1.1" 200
Please note the difference between log file and Table data. In table data, ip address 174.37.196.221,174.37.196.222,174.37.196.223 I found double entry where as in log file all are single entry.
My flume conf file is as follows ...
flumePullAgent.sources.nc1.type = exec flumePullAgent.sources.nc1.command = tail -F /home/hduser/test.txt flumePullAgent.sinks.spark.type = org.apache.spark.streaming.flume.sink.SparkSink flumePullAgent.sinks.spark.hostname = 192.168.56.102 flumePullAgent.sinks.spark.port = 33333 flumePullAgent.sinks.spark.channel = m1 flumePullAgent.sinks.log.type=logger flumePullAgent.sources.nc1.type = exec flumePullAgent.sources.nc1.command = tail -F /home/hduser/test.txt # Describe the sink flumePullAgent.sinks.spark.type = org.apache.spark.streaming.flume.sink.SparkSink flumePullAgent.sinks.spark.hostname = 192.168.56.102 flumePullAgent.sinks.spark.port = 33333 flumePullAgent.sinks.spark.channel = m1 flumePullAgent.sinks.log.type=logger flumePullAgent.channels.m1.type = memory flumePullAgent.channels.m1.capacity = 1000 flumePullAgent.channels.m1.transactionCapacity = 100# Use a channel which buffers events in memory flumePullAgent.channels.m1.type = memory flumePullAgent.channels.m1.capacity = 1000 flumePullAgent.channels.m1.transactionCapacity = 100
And my spark code is something like this
val stream = FlumeUtils.createPollingStream(ssc, host, port) stream.count().map(cnt => "Received " + cnt + " flume events ---->>>>>>>>." ).print() val tweets = stream.foreachRDD { rdd => rdd.foreachPartition { it => val conn = DriverManager.getConnection(MYSQL_CONNECTION_URL,MYSQL_USERNAME,MYSQL_PWD) val del = conn.prepareStatement("INSERT INTO requestlog (ip, requestdatetime, request, status) VALUES(?,?,?,?)") for (tuple <- it) { val strVal = new String(tuple.event.getBody.array()) //val matcher:Matcher = patternU.matcher(strVal.getBody.array()) println("Printing for for each RDD :"+strVal) val matcher:Matcher = patternU.matcher(strVal) if(matcher.matches()) { println("Match Found ") val logdataObj = new logData(matcher.group(1),matcher.group(3),matcher.group(4),matcher.group(5)) del.setString(1,logdataObj.ip) del.setString(2,logdataObj.requestdateTime) del.setString(3,logdataObj.request) del.setString(4,logdataObj.status) del.executeUpdate } else { println("No Match Found ") } } conn.close() } }
Can any one help me out to find out the where I did the mistake? Why my log data and table data are not in sync. Is this due to "tail" command? My exception was that in table also I will have same entry with same frequency as I have in log file. In fact in that case we will be able to do proper analysis for access data of our API server.
Thanks in advance...
Created 04-10-2019 05:13 PM
Did you found an answer for this problem. I'm having the same problem. Please help me.
Thank you.