You could probably use the groupyKey to group your data into windows and apply your custom functions over the values in the window. For watermark you will need to maintain custom state and use mapGroupsWithState.
Heres an example that groups the words based on the window:
case class Window(start: Timestamp, end: Timestamp)
val lines = spark.readStream
.format("socket")
.option("host", host)
.option("port", port)
.option("includeTimestamp", true)
.load()
val words = lines.as[(String, Timestamp)].flatMap(line =>
line._1.split(" ")
.map(word => (word, line._2)))
.toDF("word", "timestamp")
words.select(window($"timestamp", "10 seconds", "10 seconds"), $"data").as[(Window, String)]
.groupByKey(r => r._1)
.mapGroups((k, xs) => (k, xs.map(v => v._2).toList))
.writeStream
.format("console")
.option("truncate", "false")
.start()
Produces output like
Batch: 0
-------------------------------------------
+------------------------------------------+---------+
|_1 |_2 |
+------------------------------------------+---------+
|[1970-01-18 09:03:30, 1970-01-18 09:03:40]|[a, b, c]|
+------------------------------------------+---------