Skip to content

Commit

Permalink
[SPARK-42591][SS][DOCS] Add examples of unblocked workloads after SPA…
Browse files Browse the repository at this point in the history
…RK-42376

### What changes were proposed in this pull request?

This PR proposes to add examples of unblocked workloads after SPARK-42376, which unblocks stream-stream time interval join followed by stateful operator.

### Why are the changes needed?

We'd like to remove the description of limitations which no longer exist, as well as provide some code examples so that users can get some sense how to use the functionality.

### Does this PR introduce _any_ user-facing change?

Yes, documentation change.

### How was this patch tested?

Created a page via SKIP_API=1 bundle exec jekyll serve --watch and confirmed.

Screenshots:

> Scala

![스크린샷 2023-03-07 오후 9 39 36](https://user-images.githubusercontent.com/1317309/223424683-e7f7e721-a0fa-4e3c-a8f0-139d060dd045.png)

> Java

![스크린샷 2023-03-07 오후 9 39 28](https://user-images.githubusercontent.com/1317309/223424706-b4da49c1-f088-4513-85d6-8750b89dac56.png)

> Python

![스크린샷 2023-03-07 오후 9 37 03](https://user-images.githubusercontent.com/1317309/223424412-c12500cc-946f-4e09-8b0c-6ceed5b3aeee.png)

Closes apache#40215 from HeartSaVioR/SPARK-42591.

Authored-by: Jungtaek Lim <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
  • Loading branch information
HeartSaVioR committed Mar 7, 2023
1 parent 51504e4 commit 27c34bd
Showing 1 changed file with 126 additions and 4 deletions.
130 changes: 126 additions & 4 deletions docs/structured-streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1928,12 +1928,134 @@ Additional details on supported joins:

- As of Spark 2.4, you can use joins only when the query is in Append output mode. Other output modes are not yet supported.

- As of Spark 2.4, you cannot use other non-map-like operations before joins. Here are a few examples of
what cannot be used.
- You cannot use mapGroupsWithState and flatMapGroupsWithState before and after joins.

- Cannot use streaming aggregations before joins.
In append output mode, you can construct a query having non-map-like operations e.g. aggregation, deduplication, stream-stream join before/after join.

- Cannot use mapGroupsWithState and flatMapGroupsWithState in Update mode before joins.
For example, here's an example of time window aggregation in both streams followed by stream-stream join with event time window:

<div class="codetabs">
<div data-lang="scala" markdown="1">

{% highlight scala %}

val clicksWindow = clicksWithWatermark
.groupBy(window("clickTime", "1 hour"))
.count()

val impressionsWindow = impressionsWithWatermark
.groupBy(window("impressionTime", "1 hour"))
.count()

clicksWindow.join(impressionsWindow, "window", "inner")

{% endhighlight %}

</div>
<div data-lang="java" markdown="1">

{% highlight java %}

Dataset<Row> clicksWindow = clicksWithWatermark
.groupBy(functions.window(clicksWithWatermark.col("clickTime"), "1 hour"))
.count();

Dataset<Row> impressionsWindow = impressionsWithWatermark
.groupBy(functions.window(impressionsWithWatermark.col("impressionTime"), "1 hour"))
.count();

clicksWindow.join(impressionsWindow, "window", "inner");

{% endhighlight %}


</div>
<div data-lang="python" markdown="1">

{% highlight python %}
clicksWindow = clicksWithWatermark.groupBy(
clicksWithWatermark.clickAdId,
window(clicksWithWatermark.clickTime, "1 hour")
).count()

impressionsWindow = impressionsWithWatermark.groupBy(
impressionsWithWatermark.impressionAdId,
window(impressionsWithWatermark.impressionTime, "1 hour")
).count()

clicksWindow.join(impressionsWindow, "window", "inner")

{% endhighlight %}

</div>
</div>

Here's another example of stream-stream join with time range join condition followed by time window aggregation:

<div class="codetabs">
<div data-lang="scala" markdown="1">

{% highlight scala %}

val joined = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
joinType = "leftOuter" // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined
.groupBy($"clickAdId", window($"clickTime", "1 hour"))
.count()

{% endhighlight %}

</div>
<div data-lang="java" markdown="1">

{% highlight java %}
Dataset<Row> joined = impressionsWithWatermark.join(
clicksWithWatermark,
expr(
"clickAdId = impressionAdId AND " +
"clickTime >= impressionTime AND " +
"clickTime <= impressionTime + interval 1 hour "),
"leftOuter" // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
);

joined
.groupBy(joined.col("clickAdId"), functions.window(joined.col("clickTime"), "1 hour"))
.count();

{% endhighlight %}


</div>
<div data-lang="python" markdown="1">

{% highlight python %}
joined = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
"leftOuter" # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined.groupBy(
joined.clickAdId,
window(joined.clickTime, "1 hour")
).count()

{% endhighlight %}

</div>
</div>

### Streaming Deduplication
You can deduplicate records in data streams using a unique identifier in the events. This is exactly same as deduplication on static using a unique identifier column. The query will store the necessary amount of data from previous records such that it can filter duplicate records. Similar to aggregations, you can use deduplication with or without watermarking.
Expand Down

0 comments on commit 27c34bd

Please sign in to comment.