Skip to content

Commit

Permalink
[SPARK-42376][SS] Introduce watermark propagation among operators
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This PR proposes to introduce watermark propagation among operators via simulation, which enables the workload of "stream-stream time interval join followed by stateful operator".

As of now, Spark considers all stateful operators to have same input watermark and output watermark, which is insufficient to handle stream-stream time interval join. It can delay joined output more than global watermark, based on the join criteria. (e.g. `leftTime BETWEEN rightTime - INTERVAL 30 seconds AND rightTime + INTERVAL 40 seconds`). To address this, the join operator should be able to produce "delayed" watermark to the downstream operator. That said, Spark has to "propagate" watermark among operators, flowing through leaf node(s) to root (going downstream).

This PR introduces a new interface `WatermarkPropagator` which performs simulation of watermark propagation based on the watermark. There are three implementations for this interface:

1. NoOpWatermarkPropagator: Do nothing. This is used for initializing dummy IncrementalExecution.
2. UseSingleWatermarkPropagator: Uses a single global watermark for late events and eviction. This is used for compatibility mode (`spark.sql.streaming.statefulOperator.allowMultiple` to `false`).
3. PropagateWatermarkSimulator: simulates propagation of watermark among operators.

The simulation algorithm used in `PropagateWatermarkSimulator` traverses the physical plan tree via post-order (children first) to calculate (input watermark, output watermark) for all nodes. For each node, below logic is applied:

- Input watermark for specific node is decided by `min(input watermarks from all children)`.
   - Children providing no input watermark are excluded.
   - If there is no valid input watermark from children, it's considered as there is no input watermark.
 - Output watermark for specific node is decided as following:
   - watermark nodes: origin watermark value (global watermark).
   - stateless nodes: same as input watermark.
   - stateful nodes: the return value of `op.produceOutputWatermark(input watermark)`. (if there is no input watermark, there is no output watermark)

Once the algorithm traverses the physical plan tree, the association between stateful operator and input watermark will be constructed. The association is cached after calculation and being used across microbatches, till Spark determines the association as no longer to be used.

As mentioned like `op.produceOutputWatermark()` in above, this PR also adds a new method `produceOutputWatermark` in StateStoreWriter, which requires each stateful operator to calculate output watermark based on given input watermark. In most cases, this is same as the criteria of state eviction, as most stateful operators produce the output from two different kinds:

1. without buffering (event time > input watermark)
2. with buffering (state)

The state eviction happens when event time exceeds a "certain threshold of timestamp", which denotes a lower bound of event time values for output (output watermark). Since most stateful operators construct the predicate for state eviction based on watermark in batch planning phase, they can produce an output watermark once Spark provides an input watermark.

Please refer to the walkthrough code comment for the test case of `stream-stream time interval left outer join -> aggregation, append mode`.

There are several additional decisions made by this PR which introduces backward incompatibility.

1. Re-definition of watermark will be disallowed.

Technically, each watermark node can track its own value of watermark and PropagateWatermarkSimulator can propagate these values correctly. (multiple origins) While this may help to accelerate processing faster stream (as all watermarks don't need to follow the slowest one till join/union), this involves more complicated questions on UX perspective, as all UX about watermark is based on global watermark. This seems harder to address, hence this PR proposes to retain the global watermark as it is.

Since we want to produce watermark as the single origin value, redefinition of watermark does not make sense. Consider stream-stream time interval join followed by another watermark node. Which is the right value of output watermark for another watermark node? delayed watermark, or global watermark?

2. stateful operator will not allow multiple event time columns being defined in the input DataFrame.

The output of stream-stream join may have two event time columns, which is ambiguous on late record filtering and eviction. Currently the first appeared event time column has been picked up for late record filtering and eviction, which is ambiguous to reason about the correctness. After this PR, Spark will throw an exception. The downstream operator has to pick up only one of event time column to continue.

Turning off the flag `spark.sql.streaming.statefulOperator.allowMultiple` will restore the old behavior from the above.

(https://issues.apache.org/jira/browse/SPARK-42549 is filed to remove this limitation later.)

### Why are the changes needed?

stream-stream time interval join followed by stateful operator is not supported yet, and this PR unblocks it.

### Does this PR introduce _any_ user-facing change?

Yes, here is a list of user facing changes (some are backward incompatibility changes, though we have compatibility flag):

- stream-stream time-interval join followed by stateful operator will be allowed.
- Re-definition of watermark will be disallowed.
- stateful operator will not allow multiple event time columns being defined in the input DataFrame.

### How was this patch tested?

New & modified test cases.

Closes apache#39931 from HeartSaVioR/SPARK-42376.

Authored-by: Jungtaek Lim <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
  • Loading branch information
HeartSaVioR committed Mar 7, 2023
1 parent 201e08c commit 8e3b9d4
Show file tree
Hide file tree
Showing 16 changed files with 1,274 additions and 176 deletions.
4 changes: 1 addition & 3 deletions docs/structured-streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -2067,9 +2067,7 @@ Some of them are as follows.
for more details.

- Chaining multiple stateful operations on streaming Datasets is not supported with Update and Complete mode.
- In addition, below operations followed by other stateful operation is not supported in Append mode.
- stream-stream time interval join (inner/outer)
- flatMapGroupsWithState
- In addition, mapGroupsWithState/flatMapGroupsWithState operation followed by other stateful operation is not supported in Append mode.
- A known workaround is to split your streaming query into multiple queries having a single stateful operation per each query,
and ensure end-to-end exactly once per query. Ensuring end-to-end exactly once for the last query is optional.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BinaryComparison, CurrentDate, CurrentTimestampLike, Expression, GreaterThan, GreaterThanOrEqual, GroupingSets, LessThan, LessThanOrEqual, LocalTimestamp, MonotonicallyIncreasingID, SessionWindow}
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
Expand Down Expand Up @@ -84,9 +83,6 @@ object UnsupportedOperationChecker extends Logging {
*/
private def ifCannotBeFollowedByStatefulOperation(
p: LogicalPlan, outputMode: OutputMode): Boolean = p match {
case ExtractEquiJoinKeys(_, _, _, otherCondition, _, left, right, _) =>
left.isStreaming && right.isStreaming &&
otherCondition.isDefined && hasRangeExprAgainstEventTimeCol(otherCondition.get)
// FlatMapGroupsWithState configured with event time
case f @ FlatMapGroupsWithState(_, _, _, _, _, _, _, _, _, timeout, _, _, _, _, _, _)
if f.isStreaming && timeout == GroupStateTimeout.EventTimeTimeout => true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ case class EventTimeWatermark(
final override val nodePatterns: Seq[TreePattern] = Seq(EVENT_TIME_WATERMARK)

// Update the metadata on the eventTime column to include the desired delay.
// This is not allowed by default - WatermarkPropagator will throw an exception. We keep the
// logic here because we also maintain the compatibility flag. (See
// SQLConf.STATEFUL_OPERATOR_ALLOW_MULTIPLE for details.)
// TODO: Disallow updating the metadata once we remove the compatibility flag.
override val output: Seq[Attribute] = child.output.map { a =>
if (a semanticEquals eventTime) {
val delayMs = EventTimeWatermark.getDelayMs(delay)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -541,16 +541,16 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper {
isMapGroupsWithState = true, GroupStateTimeout.ProcessingTimeTimeout(), streamRelation)),
outputMode = Append)

// stream-stream relation, time interval join can't be followed by any stateful operators
assertFailOnGlobalWatermarkLimit(
// stream-stream relation, time interval join can be followed by any stateful operators
assertPassOnGlobalWatermarkLimit(
"multiple stateful ops - stream-stream time-interval join followed by agg",
Aggregate(Nil, aggExprs("c"),
streamRelation.join(streamRelation, joinType = Inner,
condition = Some(attribute === attribute &&
attributeWithWatermark > attributeWithWatermark + 10))),
outputMode = Append)

// stream-stream relation, only equality join can be followed by any stateful operators
// stream-stream relation, equality join can be followed by any stateful operators
assertPassOnGlobalWatermarkLimit(
"multiple stateful ops - stream-stream equality join followed by agg",
Aggregate(Nil, aggExprs("c"),
Expand Down Expand Up @@ -601,38 +601,29 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper {
outputMode = outputMode)
}

// Deduplication, if on event time column, is a stateful operator
// and cannot be placed after join
assertFailOnGlobalWatermarkLimit(
"multiple stateful ops - stream-stream time interval join followed by" +
assertPassOnGlobalWatermarkLimit(
"multiple stateful ops - stream-stream time interval join followed by " +
"dedup (with event-time)",
Deduplicate(Seq(attributeWithWatermark),
streamRelation.join(streamRelation, joinType = Inner,
condition = Some(attribute === attribute &&
attributeWithWatermark > attributeWithWatermark + 10))),
outputMode = Append)

// Deduplication, if not on event time column,
// although it is still a stateful operator,
// it can be placed after join
assertPassOnGlobalWatermarkLimit(
"multiple stateful ops - stream-stream time interval join followed by" +
"multiple stateful ops - stream-stream time interval join followed by " +
"dedup (without event-time)",
Deduplicate(Seq(att),
streamRelation.join(streamRelation, joinType = Inner,
condition = Some(attribute === attribute &&
attributeWithWatermark > attributeWithWatermark + 10))),
outputMode = Append)

// for a stream-stream join followed by a stateful operator,
// if the join is keyed on time-interval inequality conditions (inequality on watermarked cols),
// should fail.
// if the join is keyed on time-interval equality conditions -> should pass
Seq(Inner, LeftOuter, RightOuter, FullOuter).foreach {
joinType =>
assertFailOnGlobalWatermarkLimit(
assertPassOnGlobalWatermarkLimit(
s"streaming aggregation after " +
s"stream-stream $joinType join keyed on time inequality in Append mode are not supported",
s"stream-stream $joinType join keyed on time interval in Append mode are not supported",
streamRelation.join(streamRelation, joinType = joinType,
condition = Some(attributeWithWatermark === attribute &&
attributeWithWatermark < attributeWithWatermark + 10))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import org.apache.spark.sql.execution.bucketing.{CoalesceBucketsInJoin, DisableU
import org.apache.spark.sql.execution.dynamicpruning.PlanDynamicPruningFilters
import org.apache.spark.sql.execution.exchange.EnsureRequirements
import org.apache.spark.sql.execution.reuse.ReuseExchangeAndSubquery
import org.apache.spark.sql.execution.streaming.{IncrementalExecution, OffsetSeqMetadata}
import org.apache.spark.sql.execution.streaming.{IncrementalExecution, OffsetSeqMetadata, WatermarkPropagator}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -244,7 +244,8 @@ class QueryExecution(
// output mode does not matter since there is no `Sink`.
new IncrementalExecution(
sparkSession, logical, OutputMode.Append(), "<unknown>",
UUID.randomUUID, UUID.randomUUID, 0, None, OffsetSeqMetadata(0, 0))
UUID.randomUUID, UUID.randomUUID, 0, None, OffsetSeqMetadata(0, 0),
WatermarkPropagator.noop())
} else {
this
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,19 +90,24 @@ trait FlatMapGroupsWithStateExecBase

override def shortName: String = "flatMapGroupsWithState"

override def shouldRunAnotherBatch(newMetadata: OffsetSeqMetadata): Boolean = {
override def shouldRunAnotherBatch(newInputWatermark: Long): Boolean = {
timeoutConf match {
case ProcessingTimeTimeout =>
true // Always run batches to process timeouts
case EventTimeTimeout =>
// Process another non-data batch only if the watermark has changed in this executed plan
eventTimeWatermarkForEviction.isDefined &&
newMetadata.batchWatermarkMs > eventTimeWatermarkForEviction.get
newInputWatermark > eventTimeWatermarkForEviction.get
case _ =>
false
}
}

// There is no guarantee that any of the column in the output is bound to the watermark. The
// user function is quite flexible. Hence Spark does not support the stateful operator(s) after
// (flat)MapGroupsWithState.
override def produceOutputWatermark(inputWatermarkMs: Long): Option[Long] = None

/**
* Process data by applying the user defined function on a per partition basis.
*
Expand Down
Loading

0 comments on commit 8e3b9d4

Please sign in to comment.