From db06f3e0030726bbbb2476f9c985fcf97e0a0bb6 Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Fri, 20 Jan 2023 17:04:30 +0900 Subject: [PATCH] [SPARK-42105][SS][DOCS] Reflect the change of SPARK-40925 to SS guide doc ### What changes were proposed in this pull request? This PR proposes to update Structured Streaming guide doc to reflect the change of SPARK-40925. ### Why are the changes needed? SPARK-40925 addressed majority of limitation on global watermark, but we still haven't updated the doc. ### Does this PR introduce _any_ user-facing change? Yes, documentation fix. ### How was this patch tested? Built the doc page via bundle. Here's a screenshot. screenshot-SPARK-42105-update Closes #39662 from HeartSaVioR/SPARK-42105. Authored-by: Jungtaek Lim Signed-off-by: Jungtaek Lim --- .../structured-streaming-programming-guide.md | 40 ++++--------------- 1 file changed, 7 insertions(+), 33 deletions(-) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 78176c9cb8e2b..29b2620ad7747 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -1834,20 +1834,23 @@ Though Spark cannot check and force it, the state function should be implemented There are a few DataFrame/Dataset operations that are not supported with streaming DataFrames/Datasets. Some of them are as follows. -- Multiple streaming aggregations (i.e. a chain of aggregations on a streaming DF) are not yet supported on streaming Datasets. - - Limit and take the first N rows are not supported on streaming Datasets. - Distinct operations on streaming Datasets are not supported. -- Deduplication operation is not supported after aggregation on a streaming Datasets. - - Sorting operations are supported on streaming Datasets only after an aggregation and in Complete Output Mode. - Few types of outer joins on streaming Datasets are not supported. See the support matrix in the Join Operations section for more details. +- Chaining multiple stateful operations on streaming Datasets is not supported with Update and Complete mode. + - In addition, below operations followed by other stateful operation is not supported in Append mode. + - stream-stream time interval join (inner/outer) + - flatMapGroupsWithState + - A known workaround is to split your streaming query into multiple queries having a single stateful operation per each query, + and ensure end-to-end exactly once per query. Ensuring end-to-end exactly once for the last query is optional. + In addition, there are some Dataset methods that will not work on streaming Datasets. They are actions that will immediately run queries and return results, which does not make sense on a streaming Dataset. Rather, those functionalities can be done by explicitly starting a streaming query (see the next section regarding that). - `count()` - Cannot return a single count from a streaming Dataset. Instead, use `ds.groupBy().count()` which returns a streaming Dataset containing a running count. @@ -1863,35 +1866,6 @@ For example, sorting on the input stream is not supported, as it requires keepin track of all the data received in the stream. This is therefore fundamentally hard to execute efficiently. -### Limitation of global watermark - -In Append mode, if a stateful operation emits rows older than current watermark plus allowed late record delay, -they will be "late rows" in downstream stateful operations (as Spark uses global watermark). Note that these rows may be discarded. -This is a limitation of a global watermark, and it could potentially cause a correctness issue. - -Spark will check the logical plan of query and log a warning when Spark detects such a pattern. - -Any of the stateful operation(s) after any of below stateful operations can have this issue: - -* streaming aggregation in Append mode -* stream-stream outer join -* `mapGroupsWithState` and `flatMapGroupsWithState` in Append mode (depending on the implementation of the state function) - -As Spark cannot check the state function of `mapGroupsWithState`/`flatMapGroupsWithState`, Spark assumes that the state function -emits late rows if the operator uses Append mode. - -Spark provides two ways to check the number of late rows on stateful operators which would help you identify the issue: - -1. On Spark UI: check the metrics in stateful operator nodes in query execution details page in SQL tab -2. On Streaming Query Listener: check "numRowsDroppedByWatermark" in "stateOperators" in QueryProcessEvent. - -Please note that "numRowsDroppedByWatermark" represents the number of "dropped" rows by watermark, which is not always same as the count of "late input rows" for the operator. -It depends on the implementation of the operator - e.g. streaming aggregation does pre-aggregate input rows and checks the late inputs against pre-aggregated inputs, -hence the number is not same as the number of original input rows. You'd like to just check the fact whether the value is zero or non-zero. - -There's a known workaround: split your streaming query into multiple queries per stateful operator, and ensure -end-to-end exactly once per query. Ensuring end-to-end exactly once for the last query is optional. - ### State Store State store is a versioned key-value store which provides both read and write operations. In