Skip to content

Commit

Permalink
[SPARK-41733][SQL][SS] Apply tree-pattern based pruning for the rule …
Browse files Browse the repository at this point in the history
…ResolveWindowTime

### What changes were proposed in this pull request?

This PR proposes to apply tree-pattern based pruning for the rule ResolveWindowTime, to minimize the evaluation of rule with WindowTime node.

### Why are the changes needed?

The rule ResolveWindowTime is unnecessarily evaluated multiple times without proper pruning.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests. New test case is added to cover SQL usage for `window_time`.

Closes apache#39247 from HeartSaVioR/SPARK-41733.

Authored-by: Jungtaek Lim <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
  • Loading branch information
HeartSaVioR committed Dec 28, 2022
1 parent 5141a7c commit 87a235c
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Cast, CreateNamedStruct, Expression, GetStructField, IsNotNull, Literal, PreciseTimestampConversion, SessionWindow, Subtract, TimeWindow, WindowTime}
import org.apache.spark.sql.catalyst.plans.logical.{Expand, Filter, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreePattern.{SESSION_WINDOW, TIME_WINDOW}
import org.apache.spark.sql.catalyst.trees.TreePattern.{SESSION_WINDOW, TIME_WINDOW, WINDOW_TIME}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.types.{CalendarIntervalType, DataType, LongType, Metadata, MetadataBuilder, StructType}
import org.apache.spark.unsafe.types.CalendarInterval
Expand Down Expand Up @@ -287,7 +287,8 @@ object SessionWindowing extends Rule[LogicalPlan] {
* The correct representative event time of a window is ``window.end - 1``.
* */
object ResolveWindowTime extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUpWithPruning(
_.containsPattern(WINDOW_TIME), ruleId) {
case p: LogicalPlan if p.children.size == 1 =>
val child = p.children.head
val windowTimeExpressions =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.sql.catalyst.trees.TreePattern.{TreePattern, WINDOW_TIME}
import org.apache.spark.sql.types._

// scalastyle:off line.size.limit line.contains.tab
Expand Down Expand Up @@ -52,6 +53,8 @@ case class WindowTime(windowColumn: Expression)

override def dataType: DataType = child.dataType.asInstanceOf[StructType].head.dataType

final override val nodePatterns: Seq[TreePattern] = Seq(WINDOW_TIME)

override def prettyName: String = "window_time"

// This expression is replaced in the analyzer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ object RuleIdCollection {
"org.apache.spark.sql.catalyst.analysis.ResolveOrderByAll" ::
"org.apache.spark.sql.catalyst.analysis.ResolveTimeZone" ::
"org.apache.spark.sql.catalyst.analysis.ResolveUnion" ::
"org.apache.spark.sql.catalyst.analysis.ResolveWindowTime" ::
"org.apache.spark.sql.catalyst.analysis.SessionWindowing" ::
"org.apache.spark.sql.catalyst.analysis.SubstituteUnresolvedOrdinals" ::
"org.apache.spark.sql.catalyst.analysis.TimeWindowing" ::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ object TreePattern extends Enumeration {
val TIME_ZONE_AWARE_EXPRESSION: Value = Value
val TRUE_OR_FALSE_LITERAL: Value = Value
val WINDOW_EXPRESSION: Value = Value
val WINDOW_TIME: Value = Value
val UNARY_POSITIVE: Value = Value
val UNPIVOT: Value = Value
val UPDATE_FIELDS: Value = Value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -651,4 +651,31 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession {
)
)
}

test("window_time in SQL") {
withTempView("tmpView") {
val df = Seq(
("2016-03-27 19:38:19", 1), ("2016-03-27 19:39:25", 2)
).toDF("time", "value")
df.createOrReplaceTempView("tmpView")
checkAnswer(
spark.sql(
s"""
|select
| CAST(window.start AS string), CAST(window.end AS string),
| CAST(window_time(window) AS string), counts
|from
|(
| select window, count(*) AS counts from tmpView
| group by window(time, "10 seconds")
| order by window.start
|)
|""".stripMargin),
Seq(
Row("2016-03-27 19:38:10", "2016-03-27 19:38:20", "2016-03-27 19:38:19.999999", 1),
Row("2016-03-27 19:39:20", "2016-03-27 19:39:30", "2016-03-27 19:39:29.999999", 1)
)
)
}
}
}

0 comments on commit 87a235c

Please sign in to comment.