Support Questions

Find answers, ask questions, and share your expertise

Spark Streaming Aggregation: On last 5 Minutes complete data

avatar
Explorer

I am working with Spark Streaming and have a requirement to perform an aggregation on my data stream. Specifically, I need to count the occurrences of different statuses from the last 5 minutes and then display the result on the console.

The data is coming from a Kafka topic. Each record has a timestamp, and I want to filter the records such that only those falling within the last 5 minutes (from the current timestamp) are considered for aggregation.

Here's the desired outcome:

If the batch executes at 6:10 PM, it should consider only the data from the last 5 minutes. Similarly, if it executes at 6:11 PM, again it should only consider the data from the last 5 minutes.

`Example:

For 6:10 PM:

TXN00053,pending,2023-09-05 18:05:45
TXN00054,failed,2023-09-05 18:06:45
TXN00054,failed,2023-09-05 18:07:45
TXN00054,failed,2023-09-05 18:08:00
TXN00054,failed,2023-09-05 18:09:00
TXN00054,failed,2023-09-05 18:10:00

 

For 6:11 PM:

TXN00054,failed,2023-09-05 18:06:45
TXN00054,failed,2023-09-05 18:07:45
TXN00054,failed,2023-09-05 18:08:00
TXN00054,failed,2023-09-05 18:09:00
TXN00054,failed,2023-09-05 18:10:00
TXN00054,failed,2023-09-05 18:11:00 `

I tried implementing this with the following code, but it isn't working as expected:


```
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode, Trigger}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.GroupState

import spark.implicits._

val currentTimestamp = current_timestamp()
val sourceStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "")
.option("subscribe", "20230907_3")
.option("startingOffsets", "earliest")
.load()

val formattedData = sourceStream.selectExpr("CAST(value AS STRING) as message")
.select(
split($"message", ",").getItem(0).as("txn_id"),
split($"message", ",").getItem(1).as("status"),
split($"message", ",").getItem(2).as("txn_timestamp")
)
.withColumn("txn_timestamp", to_timestamp($"txn_timestamp", "yyyy-MM-dd HH:mm:ss"))

val recentData = formattedData.filter($"txn_timestamp" >= (currentTimestamp - expr("INTERVAL 5 MINUTES")))
val aggregatedData = recentData.groupBy($"status").agg(count($"status").as("count"))

val query = aggregatedData.writeStream
.outputMode("complete")
.format("console")
.trigger(Trigger.ProcessingTime("1 minute"))
.option("truncate", "false")
.start()
.awaitTermination()
```

Can anyone help me understand what I'm doing wrong and how to achieve the desired outcome? Thanks in advance!

Note : when i am using **complete mode** then basically whatever the output is coming from batch one , then in final count it's not removing the old events instead increasing the count with new records and giving result,
I tried with **update mode** then it's only giving records where any new update came in records and not giving any other records which comes under in last 5 min,
can anyone help so it can give the o/p based on combine approach of update and complete.

 

TXN00053,pending,2023-09-05 18:05:45
TXN00054,pending,2023-09-05 18:05:50
TXN00054,pending,2023-09-05 18:06:50
TXN00054,pending,2023-09-05 18:11:00`
TXN00054,failed,2023-09-05 18:06:45
TXN00054,accepted,2023-09-05 18:11:00`

**Expected Result from above data:**

For 6:10 PM:

pending,3
failed,1

For 6:11 PM: (as 18:05:45 and 18:05:50 with status pending is older then 5 min so it shd'nt be part of agg.)

pending,2
failed,1
accepted,1

 

0 REPLIES 0