From f3ede854d7082fd7d9ca4d28cea24aa976e10262 Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Tue, 28 Feb 2023 08:38:07 +0900 Subject: [PATCH] [SPARK-42592][SS][DOCS] Document how to perform chained time window aggregations MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### What changes were proposed in this pull request? This PR proposes to document how to perform chained time window aggregations. Although it is introduced as a way to perform chained time window aggregations, it can be also used "generally" to apply operations which require timestamp column against the time window data. ### Why are the changes needed? We didn't document the new functionality in the guide doc in SPARK-40925. There was a doc change SPARK-42105, but it only mentioned the unblock of limitations. ### Does this PR introduce _any_ user-facing change? Yes, documentation change. ### How was this patch tested? Created a page via `SKIP_API=1 bundle exec jekyll serve --watch` and confirmed. Screenshot: 스크린샷 2023-02-28 오전 8 32 24 Closes #40188 from HeartSaVioR/SPARK-42592. Authored-by: Jungtaek Lim Signed-off-by: Jungtaek Lim --- .../structured-streaming-programming-guide.md | 137 ++++++++++++++++++ 1 file changed, 137 insertions(+) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 29b2620ad7747..a9545d516fb28 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -1232,6 +1232,143 @@ local partition, doing partial aggregation can still increase the performance si You can enable `spark.sql.streaming.sessionWindow.merge.sessions.in.local.partition` to indicate Spark to perform partial aggregation. +#### Representation of the time for time window + +In some use cases, it is necessary to extract the representation of the time for time window, to apply operations requiring timestamp to the time windowed data. +One example is chained time window aggregations, where users want to define another time window against the time window. Say, someone wants to aggregate 5 minutes time windows as 1 hour tumble time window. + +There are two ways to achieve this, like below: + +1. Use `window_time` SQL function with time window column as parameter +2. Use `window` SQL function with time window column as parameter + +`window_time` function will produce a timestamp which represents the time for time window. +User can pass the result to the parameter of `window` function (or anywhere requiring timestamp) to perform operation(s) with time window which requires timestamp. + +
+
+ +{% highlight scala %} +import spark.implicits._ + +val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String } + +// Group the data by window and word and compute the count of each group +val windowedCounts = words.groupBy( + window($"timestamp", "10 minutes", "5 minutes"), + $"word" +).count() + +// Group the windowed data by another window and word and compute the count of each group +val anotherWindowedCounts = windowedCounts.groupBy( + window(window_time($"window"), "1 hour"), + $"word" +).count() +{% endhighlight %} + +
+
+ +{% highlight java %} +Dataset words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String } + +// Group the data by window and word and compute the count of each group +Dataset windowedCounts = words.groupBy( + functions.window(words.col("timestamp"), "10 minutes", "5 minutes"), + words.col("word") +).count(); + +// Group the windowed data by another window and word and compute the count of each group +Dataset anotherWindowedCounts = windowedCounts.groupBy( + functions.window(functions.window_time("window"), "1 hour"), + windowedCounts.col("word") +).count(); +{% endhighlight %} + +
+
+{% highlight python %} +words = ... # streaming DataFrame of schema { timestamp: Timestamp, word: String } + +# Group the data by window and word and compute the count of each group +windowedCounts = words.groupBy( + window(words.timestamp, "10 minutes", "5 minutes"), + words.word +).count() + +# Group the windowed data by another window and word and compute the count of each group +anotherWindowedCounts = windowedCounts.groupBy( + window(window_time(windowedCounts.window), "1 hour"), + windowedCounts.word +).count() +{% endhighlight %} + +
+
+ +`window` function does not only take timestamp column, but also take the time window column. This is very useful for cases where users want to apply chained time window aggregations. + +
+
+ +{% highlight scala %} +import spark.implicits._ + +val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String } + +// Group the data by window and word and compute the count of each group +val windowedCounts = words.groupBy( + window($"timestamp", "10 minutes", "5 minutes"), + $"word" +).count() + +// Group the windowed data by another window and word and compute the count of each group +val anotherWindowedCounts = windowedCounts.groupBy( + window($"window", "1 hour"), + $"word" +).count() +{% endhighlight %} + +
+
+ +{% highlight java %} +Dataset words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String } + +// Group the data by window and word and compute the count of each group +Dataset windowedCounts = words.groupBy( + functions.window(words.col("timestamp"), "10 minutes", "5 minutes"), + words.col("word") +).count(); + +// Group the windowed data by another window and word and compute the count of each group +Dataset anotherWindowedCounts = windowedCounts.groupBy( + functions.window("window", "1 hour"), + windowedCounts.col("word") +).count(); +{% endhighlight %} + +
+
+{% highlight python %} +words = ... # streaming DataFrame of schema { timestamp: Timestamp, word: String } + +# Group the data by window and word and compute the count of each group +windowedCounts = words.groupBy( + window(words.timestamp, "10 minutes", "5 minutes"), + words.word +).count() + +# Group the windowed data by another window and word and compute the count of each group +anotherWindowedCounts = windowedCounts.groupBy( + window(windowedCounts.window, "1 hour"), + windowedCounts.word +).count() +{% endhighlight %} + +
+
+ ##### Conditions for watermarking to clean aggregation state {:.no_toc}