Skip to content

Commit

Permalink
[SPARK-42592][SS][DOCS] Document how to perform chained time window a…
Browse files Browse the repository at this point in the history
…ggregations

### What changes were proposed in this pull request?

This PR proposes to document how to perform chained time window aggregations. Although it is introduced as a way to perform chained time window aggregations, it can be also used "generally" to apply operations which require timestamp column against the time window data.

### Why are the changes needed?

We didn't document the new functionality in the guide doc in SPARK-40925. There was a doc change SPARK-42105, but it only mentioned the unblock of limitations.

### Does this PR introduce _any_ user-facing change?

Yes, documentation change.

### How was this patch tested?

Created a page via `SKIP_API=1 bundle exec jekyll serve --watch` and confirmed.
Screenshot:

<img width="611" alt="스크린샷 2023-02-28 오전 8 32 24" src="https://user-images.githubusercontent.com/1317309/221713232-3ea906ce-23f6-4293-82c0-de1e69ea1ee8.png">

Closes apache#40188 from HeartSaVioR/SPARK-42592.

Authored-by: Jungtaek Lim <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
  • Loading branch information
HeartSaVioR committed Feb 27, 2023
1 parent 27ad583 commit f3ede85
Showing 1 changed file with 137 additions and 0 deletions.
137 changes: 137 additions & 0 deletions docs/structured-streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1232,6 +1232,143 @@ local partition, doing partial aggregation can still increase the performance si

You can enable `spark.sql.streaming.sessionWindow.merge.sessions.in.local.partition` to indicate Spark to perform partial aggregation.

#### Representation of the time for time window

In some use cases, it is necessary to extract the representation of the time for time window, to apply operations requiring timestamp to the time windowed data.
One example is chained time window aggregations, where users want to define another time window against the time window. Say, someone wants to aggregate 5 minutes time windows as 1 hour tumble time window.

There are two ways to achieve this, like below:

1. Use `window_time` SQL function with time window column as parameter
2. Use `window` SQL function with time window column as parameter

`window_time` function will produce a timestamp which represents the time for time window.
User can pass the result to the parameter of `window` function (or anywhere requiring timestamp) to perform operation(s) with time window which requires timestamp.

<div class="codetabs">
<div data-lang="scala" markdown="1">

{% highlight scala %}
import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"word"
).count()

// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
window(window_time($"window"), "1 hour"),
$"word"
).count()
{% endhighlight %}

</div>
<div data-lang="java" markdown="1">

{% highlight java %}
Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
Dataset<Row> windowedCounts = words.groupBy(
functions.window(words.col("timestamp"), "10 minutes", "5 minutes"),
words.col("word")
).count();

// Group the windowed data by another window and word and compute the count of each group
Dataset<Row> anotherWindowedCounts = windowedCounts.groupBy(
functions.window(functions.window_time("window"), "1 hour"),
windowedCounts.col("word")
).count();
{% endhighlight %}

</div>
<div data-lang="python" markdown="1">
{% highlight python %}
words = ... # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
window(words.timestamp, "10 minutes", "5 minutes"),
words.word
).count()

# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
window(window_time(windowedCounts.window), "1 hour"),
windowedCounts.word
).count()
{% endhighlight %}

</div>
</div>

`window` function does not only take timestamp column, but also take the time window column. This is very useful for cases where users want to apply chained time window aggregations.

<div class="codetabs">
<div data-lang="scala" markdown="1">

{% highlight scala %}
import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"word"
).count()

// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
window($"window", "1 hour"),
$"word"
).count()
{% endhighlight %}

</div>
<div data-lang="java" markdown="1">

{% highlight java %}
Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
Dataset<Row> windowedCounts = words.groupBy(
functions.window(words.col("timestamp"), "10 minutes", "5 minutes"),
words.col("word")
).count();

// Group the windowed data by another window and word and compute the count of each group
Dataset<Row> anotherWindowedCounts = windowedCounts.groupBy(
functions.window("window", "1 hour"),
windowedCounts.col("word")
).count();
{% endhighlight %}

</div>
<div data-lang="python" markdown="1">
{% highlight python %}
words = ... # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
window(words.timestamp, "10 minutes", "5 minutes"),
words.word
).count()

# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
window(windowedCounts.window, "1 hour"),
windowedCounts.word
).count()
{% endhighlight %}

</div>
</div>

##### Conditions for watermarking to clean aggregation state
{:.no_toc}

Expand Down

0 comments on commit f3ede85

Please sign in to comment.