Skip to content

Commit

Permalink
[SPARK-42931][SS] Introduce dropDuplicatesWithinWatermark
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This PR proposes to introduce a new API of dropDuplicates which has following different characteristics compared to existing dropDuplicates:

* Weaker constraints on the subset (key)
  * Does not require an event time column on the subset.
* Looser semantics on deduplication
  * Only guarantee to deduplicate events within watermark delay.

Since the new API leverages event time, the new API has following new requirements:

* The watermark must be defined in the streaming DataFrame
* The event time column must be defined in the streaming DataFrame.

More specifically on the semantic, once the operator processes the first arrived event, events arriving within the watermark for the first event will be deduplicated.
(Technically, the expiration time should be the “event time of the first arrived event + watermark delay threshold”, to match up with future events.)

Users are encouraged to set the delay threshold of watermark longer than max timestamp differences among duplicated events. (If they are unsure, they can alternatively set the delay threshold large enough, e.g. 48 hours.)

For batch DataFrame, this is equivalent to the dropDuplicates.

This PR also updates the SS guide doc to introduce the new feature; screenshots below:

<img width="747" alt="스크린샷 2023-04-06 오전 11 09 12" src="https://user-images.githubusercontent.com/1317309/230254868-7fe76175-5883-4700-b018-d85d851799cb.png">
<img width="749" alt="스크린샷 2023-04-06 오전 11 09 18" src="https://user-images.githubusercontent.com/1317309/230254874-a754cdfd-2832-41dd-85b6-291f05eccb3d.png">
<img width="752" alt="스크린샷 2023-04-06 오전 11 09 23" src="https://user-images.githubusercontent.com/1317309/230254876-7fd7b3b1-f59d-481f-8249-5a4ae556c7cf.png">
<img width="751" alt="스크린샷 2023-04-06 오전 11 09 29" src="https://user-images.githubusercontent.com/1317309/230254880-79b158ca-3403-46a6-be4a-46618ec749db.png">

### Why are the changes needed?

Existing dropDuplicates API does not address the valid use case on streaming query.

There are many cases where the event time is not exact the same, although these events are same. One example is duplicated events are produced due to non-idempotent writer where event time is issued from producer/broker side. Another example is that the value of event time is unstable and users want to use alternative timestamp e.g. ingestion time.

For these case, users have to exclude event time column from subset of deduplication, but then the operator is unable to evict state, leading to indefinitely growing state.

To allow eviction of state while event time column is not required to be a part of subset of deduplication, we need to loose the semantic for the API, which warrants a new API.

### Does this PR introduce _any_ user-facing change?

Yes, this introduces a new public API, dropDuplicatesWithinWatermark.

### How was this patch tested?

New test suite.

Closes apache#40561 from HeartSaVioR/SPARK-42931.

Authored-by: Jungtaek Lim <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
  • Loading branch information
HeartSaVioR committed Apr 8, 2023
1 parent d8b720a commit 0e9e34c
Show file tree
Hide file tree
Showing 13 changed files with 614 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2318,6 +2318,24 @@ class Dataset[T] private[sql] (
dropDuplicates(colNames)
}

def dropDuplicatesWithinWatermark(): Dataset[T] = {
dropDuplicatesWithinWatermark(this.columns)
}

def dropDuplicatesWithinWatermark(colNames: Seq[String]): Dataset[T] = {
throw new UnsupportedOperationException("dropDuplicatesWithinWatermark is not implemented.")
}

def dropDuplicatesWithinWatermark(colNames: Array[String]): Dataset[T] = {
dropDuplicatesWithinWatermark(colNames.toSeq)
}

@scala.annotation.varargs
def dropDuplicatesWithinWatermark(col1: String, cols: String*): Dataset[T] = {
val colNames: Seq[String] = col1 +: cols
dropDuplicatesWithinWatermark(colNames)
}

/**
* Computes basic statistics for numeric and string columns, including count, mean, stddev, min,
* and max. If no columns are given, this function computes statistics for all numerical or
Expand Down
55 changes: 55 additions & 0 deletions docs/structured-streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -2132,6 +2132,61 @@ streamingDf <- withWatermark(streamingDf, "eventTime", "10 seconds")
streamingDf <- dropDuplicates(streamingDf, "guid", "eventTime")
{% endhighlight %}

</div>

</div>

Specifically for streaming, you can deduplicate records in data streams using a unique identifier in the events, within the time range of watermark.
For example, if you set the delay threshold of watermark as "1 hour", duplicated events which occurred within 1 hour can be correctly deduplicated.
(For more details, please refer to the API doc of [dropDuplicatesWithinWatermark](/api/scala/org/apache/spark/sql/Dataset.html#dropDuplicatesWithinWatermark():org.apache.spark.sql.Dataset[T]).)

This can be used to deal with use case where event time column cannot be a part of unique identifier, mostly due to the case
where event times are somehow different for the same records. (E.g. non-idempotent writer where issuing event time happens at write)

Users are encouraged to set the delay threshold of watermark longer than max timestamp differences among duplicated events.

This feature requires watermark with delay threshold to be set in streaming DataFrame/Dataset.

<div class="codetabs">

<div data-lang="python" markdown="1">

{% highlight python %}
streamingDf = spark.readStream. ...

# deduplicate using guid column with watermark based on eventTime column
streamingDf \
.withWatermark("eventTime", "10 hours") \
.dropDuplicatesWithinWatermark("guid")
{% endhighlight %}

</div>

<div data-lang="scala" markdown="1">

{% highlight scala %}
val streamingDf = spark.readStream. ... // columns: guid, eventTime, ...

// deduplicate using guid column with watermark based on eventTime column
streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark("guid")
{% endhighlight %}

</div>

<div data-lang="java" markdown="1">

{% highlight java %}
Dataset<Row> streamingDf = spark.readStream(). ...; // columns: guid, eventTime, ...

// deduplicate using guid column with watermark based on eventTime column
streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark("guid");
{% endhighlight %}


</div>

</div>
Expand Down
1 change: 1 addition & 0 deletions python/docs/source/reference/pyspark.sql/dataframe.rst
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ DataFrame
DataFrame.distinct
DataFrame.drop
DataFrame.dropDuplicates
DataFrame.dropDuplicatesWithinWatermark
DataFrame.drop_duplicates
DataFrame.dropna
DataFrame.dtypes
Expand Down
3 changes: 3 additions & 0 deletions python/pyspark/sql/connect/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,9 @@ def dropDuplicates(self, subset: Optional[List[str]] = None) -> "DataFrame":

drop_duplicates = dropDuplicates

def dropDuplicatesWithinWatermark(self, subset: Optional[List[str]] = None) -> "DataFrame":
raise NotImplementedError("dropDuplicatesWithinWatermark() is not implemented.")

def distinct(self) -> "DataFrame":
return DataFrame.withPlan(
plan.Deduplicate(child=self._plan, all_columns_as_keys=True), session=self._session
Expand Down
56 changes: 56 additions & 0 deletions python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -3963,6 +3963,62 @@ def dropDuplicates(self, subset: Optional[List[str]] = None) -> "DataFrame":
jdf = self._jdf.dropDuplicates(self._jseq(subset))
return DataFrame(jdf, self.sparkSession)

def dropDuplicatesWithinWatermark(self, subset: Optional[List[str]] = None) -> "DataFrame":
"""Return a new :class:`DataFrame` with duplicate rows removed,
optionally only considering certain columns, within watermark.
This only works with streaming :class:`DataFrame`, and watermark for the input
:class:`DataFrame` must be set via :func:`withWatermark`.
For a streaming :class:`DataFrame`, this will keep all data across triggers as intermediate
state to drop duplicated rows. The state will be kept to guarantee the semantic, "Events
are deduplicated as long as the time distance of earliest and latest events are smaller
than the delay threshold of watermark." Users are encouraged to set the delay threshold of
watermark longer than max timestamp differences among duplicated events.
Note: too late data older than watermark will be dropped.
.. versionadded:: 3.5.0
Parameters
----------
subset : List of column names, optional
List of columns to use for duplicate comparison (default All columns).
Returns
-------
:class:`DataFrame`
DataFrame without duplicates.
Examples
--------
>>> from pyspark.sql import Row
>>> from pyspark.sql.functions import timestamp_seconds
>>> df = spark.readStream.format("rate").load().selectExpr(
... "value % 5 AS value", "timestamp")
>>> df.select("value", df.timestamp.alias("time")).withWatermark("time", '10 minutes')
DataFrame[value: bigint, time: timestamp]
Deduplicate the same rows.
>>> df.dropDuplicatesWithinWatermark() # doctest: +SKIP
Deduplicate values on 'value' columns.
>>> df.dropDuplicatesWithinWatermark(['value']) # doctest: +SKIP
"""
if subset is not None and (not isinstance(subset, Iterable) or isinstance(subset, str)):
raise PySparkTypeError(
error_class="NOT_LIST_OR_TUPLE",
message_parameters={"arg_name": "subset", "arg_type": type(subset).__name__},
)

if subset is None:
jdf = self._jdf.dropDuplicatesWithinWatermark()
else:
jdf = self._jdf.dropDuplicatesWithinWatermark(self._jseq(subset))
return DataFrame(jdf, self.sparkSession)

def dropna(
self,
how: str = "any",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ object UnsupportedOperationChecker extends Logging {
case p if p.isStreaming =>
throwError("Queries with streaming sources must be executed with writeStream.start()")(p)

case d: DeduplicateWithinWatermark =>
throwError("dropDuplicatesWithinWatermark is not supported with batch " +
"DataFrames/DataSets")(d)

case _ =>
}
}
Expand Down Expand Up @@ -114,6 +118,7 @@ object UnsupportedOperationChecker extends Logging {
case f: FlatMapGroupsWithState if f.isStreaming => true
case f: FlatMapGroupsInPandasWithState if f.isStreaming => true
case d: Deduplicate if d.isStreaming && d.keys.exists(hasEventTimeCol) => true
case d: DeduplicateWithinWatermark if d.isStreaming => true
case _ => false
}

Expand Down Expand Up @@ -464,6 +469,19 @@ object UnsupportedOperationChecker extends Logging {
throwError(s"Join type $joinType is not supported with streaming DataFrame/Dataset")
}

case d: DeduplicateWithinWatermark if d.isStreaming =>
// Find any attributes that are associated with an eventTime watermark.
val watermarkAttributes = d.child.output.collect {
case a: Attribute if a.metadata.contains(EventTimeWatermark.delayKey) => a
}

// DeduplicateWithinWatermark requires event time column being set in the input DataFrame
if (watermarkAttributes.isEmpty) {
throwError(
"dropDuplicatesWithinWatermark is not supported on streaming DataFrames/DataSets " +
"without watermark")(plan)
}

case c: CoGroup if c.children.exists(_.isStreaming) =>
throwError("CoGrouping with a streaming DataFrame/Dataset is not supported")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,8 @@ object RemoveNoopUnion extends Rule[LogicalPlan] {
d.withNewChildren(Seq(simplifyUnion(u)))
case d @ Deduplicate(_, u: Union) =>
d.withNewChildren(Seq(simplifyUnion(u)))
case d @ DeduplicateWithinWatermark(_, u: Union) =>
d.withNewChildren(Seq(simplifyUnion(u)))
}
}

Expand Down Expand Up @@ -1451,6 +1453,9 @@ object CombineUnions extends Rule[LogicalPlan] {
// Only handle distinct-like 'Deduplicate', where the keys == output
case Deduplicate(keys: Seq[Attribute], u: Union) if AttributeSet(keys) == u.outputSet =>
Deduplicate(keys, flattenUnion(u, true))
case DeduplicateWithinWatermark(keys: Seq[Attribute], u: Union)
if AttributeSet(keys) == u.outputSet =>
DeduplicateWithinWatermark(keys, flattenUnion(u, true))
}

private def flattenUnion(union: Union, flattenDistinct: Boolean): Union = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1912,6 +1912,14 @@ case class Deduplicate(
copy(child = newChild)
}

case class DeduplicateWithinWatermark(keys: Seq[Attribute], child: LogicalPlan) extends UnaryNode {
override def maxRows: Option[Long] = child.maxRows
override def output: Seq[Attribute] = child.output
final override val nodePatterns: Seq[TreePattern] = Seq(DISTINCT_LIKE)
override protected def withNewChildInternal(newChild: LogicalPlan): DeduplicateWithinWatermark =
copy(child = newChild)
}

/**
* A trait to represent the commands that support subqueries.
* This is used to allow such commands in the subquery-related checks.
Expand Down
123 changes: 109 additions & 14 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2999,20 +2999,7 @@ class Dataset[T] private[sql](
* @since 2.0.0
*/
def dropDuplicates(colNames: Seq[String]): Dataset[T] = withTypedPlan {
val resolver = sparkSession.sessionState.analyzer.resolver
val allColumns = queryExecution.analyzed.output
// SPARK-31990: We must keep `toSet.toSeq` here because of the backward compatibility issue
// (the Streaming's state store depends on the `groupCols` order).
val groupCols = colNames.toSet.toSeq.flatMap { (colName: String) =>
// It is possibly there are more than one columns with the same name,
// so we call filter instead of find.
val cols = allColumns.filter(col => resolver(col.name, colName))
if (cols.isEmpty) {
throw QueryCompilationErrors.cannotResolveColumnNameAmongAttributesError(
colName, schema.fieldNames.mkString(", "))
}
cols
}
val groupCols = groupColsFromDropDuplicates(colNames)
Deduplicate(groupCols, logicalPlan)
}

Expand Down Expand Up @@ -3050,6 +3037,114 @@ class Dataset[T] private[sql](
dropDuplicates(colNames)
}

/**
* Returns a new Dataset with duplicates rows removed, within watermark.
*
* This only works with streaming [[Dataset]], and watermark for the input [[Dataset]] must be
* set via [[withWatermark]].
*
* For a streaming [[Dataset]], this will keep all data across triggers as intermediate state
* to drop duplicated rows. The state will be kept to guarantee the semantic, "Events are
* deduplicated as long as the time distance of earliest and latest events are smaller than the
* delay threshold of watermark." Users are encouraged to set the delay threshold of watermark
* longer than max timestamp differences among duplicated events.
*
* Note: too late data older than watermark will be dropped.
*
* @group typedrel
* @since 3.5.0
*/
def dropDuplicatesWithinWatermark(): Dataset[T] = {
dropDuplicatesWithinWatermark(this.columns)
}

/**
* Returns a new Dataset with duplicates rows removed, considering only the subset of columns,
* within watermark.
*
* This only works with streaming [[Dataset]], and watermark for the input [[Dataset]] must be
* set via [[withWatermark]].
*
* For a streaming [[Dataset]], this will keep all data across triggers as intermediate state
* to drop duplicated rows. The state will be kept to guarantee the semantic, "Events are
* deduplicated as long as the time distance of earliest and latest events are smaller than the
* delay threshold of watermark." Users are encouraged to set the delay threshold of watermark
* longer than max timestamp differences among duplicated events.
*
* Note: too late data older than watermark will be dropped.
*
* @group typedrel
* @since 3.5.0
*/
def dropDuplicatesWithinWatermark(colNames: Seq[String]): Dataset[T] = withTypedPlan {
val groupCols = groupColsFromDropDuplicates(colNames)
// UnsupportedOperationChecker will fail the query if this is called with batch Dataset.
DeduplicateWithinWatermark(groupCols, logicalPlan)
}

/**
* Returns a new Dataset with duplicates rows removed, considering only the subset of columns,
* within watermark.
*
* This only works with streaming [[Dataset]], and watermark for the input [[Dataset]] must be
* set via [[withWatermark]].
*
* For a streaming [[Dataset]], this will keep all data across triggers as intermediate state
* to drop duplicated rows. The state will be kept to guarantee the semantic, "Events are
* deduplicated as long as the time distance of earliest and latest events are smaller than the
* delay threshold of watermark." Users are encouraged to set the delay threshold of watermark
* longer than max timestamp differences among duplicated events.
*
* Note: too late data older than watermark will be dropped.
*
* @group typedrel
* @since 3.5.0
*/
def dropDuplicatesWithinWatermark(colNames: Array[String]): Dataset[T] = {
dropDuplicatesWithinWatermark(colNames.toSeq)
}

/**
* Returns a new Dataset with duplicates rows removed, considering only the subset of columns,
* within watermark.
*
* This only works with streaming [[Dataset]], and watermark for the input [[Dataset]] must be
* set via [[withWatermark]].
*
* For a streaming [[Dataset]], this will keep all data across triggers as intermediate state
* to drop duplicated rows. The state will be kept to guarantee the semantic, "Events are
* deduplicated as long as the time distance of earliest and latest events are smaller than the
* delay threshold of watermark." Users are encouraged to set the delay threshold of watermark
* longer than max timestamp differences among duplicated events.
*
* Note: too late data older than watermark will be dropped.
*
* @group typedrel
* @since 3.5.0
*/
@scala.annotation.varargs
def dropDuplicatesWithinWatermark(col1: String, cols: String*): Dataset[T] = {
val colNames: Seq[String] = col1 +: cols
dropDuplicatesWithinWatermark(colNames)
}

private def groupColsFromDropDuplicates(colNames: Seq[String]): Seq[Attribute] = {
val resolver = sparkSession.sessionState.analyzer.resolver
val allColumns = queryExecution.analyzed.output
// SPARK-31990: We must keep `toSet.toSeq` here because of the backward compatibility issue
// (the Streaming's state store depends on the `groupCols` order).
colNames.toSet.toSeq.flatMap { (colName: String) =>
// It is possibly there are more than one columns with the same name,
// so we call filter instead of find.
val cols = allColumns.filter(col => resolver(col.name, colName))
if (cols.isEmpty) {
throw QueryCompilationErrors.cannotResolveColumnNameAmongAttributesError(
colName, schema.fieldNames.mkString(", "))
}
cols
}
}

/**
* Computes basic statistics for numeric and string columns, including count, mean, stddev, min,
* and max. If no columns are given, this function computes statistics for all numerical or
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,9 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case Deduplicate(keys, child) if child.isStreaming =>
StreamingDeduplicateExec(keys, planLater(child)) :: Nil

case DeduplicateWithinWatermark(keys, child) if child.isStreaming =>
StreamingDeduplicateWithinWatermarkExec(keys, planLater(child)) :: Nil

case _ => Nil
}
}
Expand Down
Loading

0 comments on commit 0e9e34c

Please sign in to comment.