Skip to content

Commit

Permalink
[SPARK-42105][SS][DOCS] Reflect the change of SPARK-40925 to SS guide…
Browse files Browse the repository at this point in the history
… doc

### What changes were proposed in this pull request?

This PR proposes to update Structured Streaming guide doc to reflect the change of SPARK-40925.

### Why are the changes needed?

SPARK-40925 addressed majority of limitation on global watermark, but we still haven't updated the doc.

### Does this PR introduce _any_ user-facing change?

Yes, documentation fix.

### How was this patch tested?

Built the doc page via bundle. Here's a screenshot.

<img width="979" alt="screenshot-SPARK-42105-update" src="https://user-images.githubusercontent.com/1317309/213628538-330bb326-4dd4-4212-9687-157ffac32429.png">

Closes apache#39662 from HeartSaVioR/SPARK-42105.

Authored-by: Jungtaek Lim <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
  • Loading branch information
HeartSaVioR committed Jan 20, 2023
1 parent 9c53845 commit db06f3e
Showing 1 changed file with 7 additions and 33 deletions.
40 changes: 7 additions & 33 deletions docs/structured-streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1834,20 +1834,23 @@ Though Spark cannot check and force it, the state function should be implemented
There are a few DataFrame/Dataset operations that are not supported with streaming DataFrames/Datasets.
Some of them are as follows.

- Multiple streaming aggregations (i.e. a chain of aggregations on a streaming DF) are not yet supported on streaming Datasets.

- Limit and take the first N rows are not supported on streaming Datasets.

- Distinct operations on streaming Datasets are not supported.

- Deduplication operation is not supported after aggregation on a streaming Datasets.

- Sorting operations are supported on streaming Datasets only after an aggregation and in Complete Output Mode.

- Few types of outer joins on streaming Datasets are not supported. See the
<a href="#support-matrix-for-joins-in-streaming-queries">support matrix in the Join Operations section</a>
for more details.

- Chaining multiple stateful operations on streaming Datasets is not supported with Update and Complete mode.
- In addition, below operations followed by other stateful operation is not supported in Append mode.
- stream-stream time interval join (inner/outer)
- flatMapGroupsWithState
- A known workaround is to split your streaming query into multiple queries having a single stateful operation per each query,
and ensure end-to-end exactly once per query. Ensuring end-to-end exactly once for the last query is optional.

In addition, there are some Dataset methods that will not work on streaming Datasets. They are actions that will immediately run queries and return results, which does not make sense on a streaming Dataset. Rather, those functionalities can be done by explicitly starting a streaming query (see the next section regarding that).

- `count()` - Cannot return a single count from a streaming Dataset. Instead, use `ds.groupBy().count()` which returns a streaming Dataset containing a running count.
Expand All @@ -1863,35 +1866,6 @@ For example, sorting on the input stream is not supported, as it requires keepin
track of all the data received in the stream. This is therefore fundamentally hard to execute
efficiently.

### Limitation of global watermark

In Append mode, if a stateful operation emits rows older than current watermark plus allowed late record delay,
they will be "late rows" in downstream stateful operations (as Spark uses global watermark). Note that these rows may be discarded.
This is a limitation of a global watermark, and it could potentially cause a correctness issue.

Spark will check the logical plan of query and log a warning when Spark detects such a pattern.

Any of the stateful operation(s) after any of below stateful operations can have this issue:

* streaming aggregation in Append mode
* stream-stream outer join
* `mapGroupsWithState` and `flatMapGroupsWithState` in Append mode (depending on the implementation of the state function)

As Spark cannot check the state function of `mapGroupsWithState`/`flatMapGroupsWithState`, Spark assumes that the state function
emits late rows if the operator uses Append mode.

Spark provides two ways to check the number of late rows on stateful operators which would help you identify the issue:

1. On Spark UI: check the metrics in stateful operator nodes in query execution details page in SQL tab
2. On Streaming Query Listener: check "numRowsDroppedByWatermark" in "stateOperators" in QueryProcessEvent.

Please note that "numRowsDroppedByWatermark" represents the number of "dropped" rows by watermark, which is not always same as the count of "late input rows" for the operator.
It depends on the implementation of the operator - e.g. streaming aggregation does pre-aggregate input rows and checks the late inputs against pre-aggregated inputs,
hence the number is not same as the number of original input rows. You'd like to just check the fact whether the value is zero or non-zero.

There's a known workaround: split your streaming query into multiple queries per stateful operator, and ensure
end-to-end exactly once per query. Ensuring end-to-end exactly once for the last query is optional.

### State Store

State store is a versioned key-value store which provides both read and write operations. In
Expand Down

0 comments on commit db06f3e

Please sign in to comment.