Member since
03-16-2020
11
Posts
0
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
2143 | 09-07-2023 07:39 PM |
09-07-2023
07:39 PM
@RangaReddy Thanks ,It was server level issue, I tried with different edge not and it worked,
... View more
09-07-2023
07:37 PM
I am working with Spark Streaming and have a requirement to perform an aggregation on my data stream. Specifically, I need to count the occurrences of different statuses from the last 5 minutes and then display the result on the console. The data is coming from a Kafka topic. Each record has a timestamp, and I want to filter the records such that only those falling within the last 5 minutes (from the current timestamp) are considered for aggregation. Here's the desired outcome: If the batch executes at 6:10 PM, it should consider only the data from the last 5 minutes. Similarly, if it executes at 6:11 PM, again it should only consider the data from the last 5 minutes. `Example: For 6:10 PM: TXN00053,pending,2023-09-05 18:05:45 TXN00054,failed,2023-09-05 18:06:45 TXN00054,failed,2023-09-05 18:07:45 TXN00054,failed,2023-09-05 18:08:00 TXN00054,failed,2023-09-05 18:09:00 TXN00054,failed,2023-09-05 18:10:00 For 6:11 PM: TXN00054,failed,2023-09-05 18:06:45 TXN00054,failed,2023-09-05 18:07:45 TXN00054,failed,2023-09-05 18:08:00 TXN00054,failed,2023-09-05 18:09:00 TXN00054,failed,2023-09-05 18:10:00 TXN00054,failed,2023-09-05 18:11:00 ` I tried implementing this with the following code, but it isn't working as expected: ``` import org.apache.spark.sql.SparkSession import org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode, Trigger} import org.apache.spark.sql.functions._ import org.apache.spark.sql.streaming.GroupState import spark.implicits._ val currentTimestamp = current_timestamp() val sourceStream = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", "") .option("subscribe", "20230907_3") .option("startingOffsets", "earliest") .load() val formattedData = sourceStream.selectExpr("CAST(value AS STRING) as message") .select( split($"message", ",").getItem(0).as("txn_id"), split($"message", ",").getItem(1).as("status"), split($"message", ",").getItem(2).as("txn_timestamp") ) .withColumn("txn_timestamp", to_timestamp($"txn_timestamp", "yyyy-MM-dd HH:mm:ss")) val recentData = formattedData.filter($"txn_timestamp" >= (currentTimestamp - expr("INTERVAL 5 MINUTES"))) val aggregatedData = recentData.groupBy($"status").agg(count($"status").as("count")) val query = aggregatedData.writeStream .outputMode("complete") .format("console") .trigger(Trigger.ProcessingTime("1 minute")) .option("truncate", "false") .start() .awaitTermination() ``` Can anyone help me understand what I'm doing wrong and how to achieve the desired outcome? Thanks in advance! Note : when i am using **complete mode** then basically whatever the output is coming from batch one , then in final count it's not removing the old events instead increasing the count with new records and giving result, I tried with **update mode** then it's only giving records where any new update came in records and not giving any other records which comes under in last 5 min, can anyone help so it can give the o/p based on combine approach of update and complete. TXN00053,pending,2023-09-05 18:05:45 TXN00054,pending,2023-09-05 18:05:50 TXN00054,pending,2023-09-05 18:06:50 TXN00054,pending,2023-09-05 18:11:00` TXN00054,failed,2023-09-05 18:06:45 TXN00054,accepted,2023-09-05 18:11:00` **Expected Result from above data:** For 6:10 PM: pending,3 failed,1 For 6:11 PM: (as 18:05:45 and 18:05:50 with status pending is older then 5 min so it shd'nt be part of agg.) pending,2 failed,1 accepted,1
... View more
Labels:
- Labels:
-
Apache Spark
04-28-2022
04:54 AM
@Rohan44 maybe you have a typo above, but try: SELECT COUNT(*) FROM FLOWFILE
... View more
09-22-2021
06:19 AM
@Rohan44 Please do test this command once before you run it on actual data. You could also take a backup of the hdfs data, to be safe.
... View more
03-16-2020
05:22 AM
I'm also getting the same issue but for me instead of taking my username.its showing user Anonymous.Can anyone suggest how should i resolve this issue?
... View more