I am working with Spark Streaming and have a requirement to perform an aggregation on my data stream. Specifically, I need to count the occurrences of different statuses from the last 5 minutes and then display the result on the console.
The data is coming from a Kafka topic. Each record has a timestamp, and I want to filter the records such that only those falling within the last 5 minutes (from the current timestamp) are considered for aggregation.
Here's the desired outcome:
If the batch executes at 6:10 PM, it should consider only the data from the last 5 minutes. Similarly, if it executes at 6:11 PM, again it should only consider the data from the last 5 minutes.
`Example:
For 6:10 PM:
TXN00053,pending,2023-09-05 18:05:45
TXN00054,failed,2023-09-05 18:06:45
TXN00054,failed,2023-09-05 18:07:45
TXN00054,failed,2023-09-05 18:08:00
TXN00054,failed,2023-09-05 18:09:00
TXN00054,failed,2023-09-05 18:10:00
For 6:11 PM:
TXN00054,failed,2023-09-05 18:06:45
TXN00054,failed,2023-09-05 18:07:45
TXN00054,failed,2023-09-05 18:08:00
TXN00054,failed,2023-09-05 18:09:00
TXN00054,failed,2023-09-05 18:10:00
TXN00054,failed,2023-09-05 18:11:00 `
I tried implementing this with the following code, but it isn't working as expected:
```
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode, Trigger}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.GroupState
import spark.implicits._
val currentTimestamp = current_timestamp()
val sourceStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "")
.option("subscribe", "20230907_3")
.option("startingOffsets", "earliest")
.load()
val formattedData = sourceStream.selectExpr("CAST(value AS STRING) as message")
.select(
split($"message", ",").getItem(0).as("txn_id"),
split($"message", ",").getItem(1).as("status"),
split($"message", ",").getItem(2).as("txn_timestamp")
)
.withColumn("txn_timestamp", to_timestamp($"txn_timestamp", "yyyy-MM-dd HH:mm:ss"))
val recentData = formattedData.filter($"txn_timestamp" >= (currentTimestamp - expr("INTERVAL 5 MINUTES")))
val aggregatedData = recentData.groupBy($"status").agg(count($"status").as("count"))
val query = aggregatedData.writeStream
.outputMode("complete")
.format("console")
.trigger(Trigger.ProcessingTime("1 minute"))
.option("truncate", "false")
.start()
.awaitTermination()
```
Can anyone help me understand what I'm doing wrong and how to achieve the desired outcome? Thanks in advance!
Note : when i am using **complete mode** then basically whatever the output is coming from batch one , then in final count it's not removing the old events instead increasing the count with new records and giving result,
I tried with **update mode** then it's only giving records where any new update came in records and not giving any other records which comes under in last 5 min,
can anyone help so it can give the o/p based on combine approach of update and complete.
TXN00053,pending,2023-09-05 18:05:45
TXN00054,pending,2023-09-05 18:05:50
TXN00054,pending,2023-09-05 18:06:50
TXN00054,pending,2023-09-05 18:11:00`
TXN00054,failed,2023-09-05 18:06:45
TXN00054,accepted,2023-09-05 18:11:00`
**Expected Result from above data:**
For 6:10 PM:
pending,3
failed,1
For 6:11 PM: (as 18:05:45 and 18:05:50 with status pending is older then 5 min so it shd'nt be part of agg.)
pending,2
failed,1
accepted,1