Support Questions

Find answers, ask questions, and share your expertise

Spark Streaming. Window and watermark function without aggregation

New Contributor

As I understood from documentation, in order to use window and watermark function, you must to perform aggregation.

In my case I have a stream of log in format:

|name|type|action|received_time|....another 60 columns

I'm interested to split data to buckets every 10 minutes(12:00, 12:10...) Is there away to use window and watermark functionality without aggregation.

1 REPLY 1

Cloudera Employee

You could probably use the groupyKey to group your data into windows and apply your custom functions over the values in the window. For watermark you will need to maintain custom state and use mapGroupsWithState.

Heres an example that groups the words based on the window:

case class Window(start: Timestamp, end: Timestamp)

val lines = spark.readStream
  .format("socket")
  .option("host", host)
  .option("port", port)
  .option("includeTimestamp", true)
  .load()

val words = lines.as[(String, Timestamp)].flatMap(line =>
  line._1.split(" ")
.map(word => (word, line._2)))
.toDF("word", "timestamp")

words.select(window($"timestamp", "10 seconds", "10 seconds"), $"data").as[(Window, String)]
.groupByKey(r => r._1)
.mapGroups((k, xs) => (k, xs.map(v => v._2).toList))
.writeStream
.format("console")
.option("truncate", "false")
.start()

Produces output like


Batch: 0
-------------------------------------------
+------------------------------------------+---------+
|_1                                        |_2       |
+------------------------------------------+---------+
|[1970-01-18 09:03:30, 1970-01-18 09:03:40]|[a, b, c]|

+------------------------------------------+---------