Skip to content

Commit

Permalink
[SPARK-41732][SQL][SS] Apply tree-pattern based pruning for the rule …
Browse files Browse the repository at this point in the history
…SessionWindowing

### What changes were proposed in this pull request?

This PR proposes to apply tree-pattern based pruning for the rule SessionWindowing, to minimize the evaluation of rule with SessionWindow node.

### Why are the changes needed?

The rule SessionWindowing is unnecessarily evaluated multiple times without proper pruning.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests.

Closes apache#39245 from HeartSaVioR/SPARK-41732.

Authored-by: Jungtaek Lim <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
  • Loading branch information
HeartSaVioR committed Dec 28, 2022
1 parent dba3376 commit d280684
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Cast, CreateNamedStruct, Expression, GetStructField, IsNotNull, Literal, PreciseTimestampConversion, SessionWindow, Subtract, TimeWindow, WindowTime}
import org.apache.spark.sql.catalyst.plans.logical.{Expand, Filter, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreePattern.TIME_WINDOW
import org.apache.spark.sql.catalyst.trees.TreePattern.{SESSION_WINDOW, TIME_WINDOW}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.types.{CalendarIntervalType, DataType, LongType, Metadata, MetadataBuilder, StructType}
import org.apache.spark.unsafe.types.CalendarInterval
Expand Down Expand Up @@ -187,7 +187,8 @@ object SessionWindowing extends Rule[LogicalPlan] {
* This also adds a marker to the session column so that downstream can easily find the column
* on session window.
*/
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUpWithPruning(
_.containsPattern(SESSION_WINDOW), ruleId) {
case p: LogicalPlan if p.children.size == 1 =>
val child = p.children.head
val sessionExpressions =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.sql.catalyst.trees.TreePattern.{SESSION_WINDOW, TreePattern}
import org.apache.spark.sql.catalyst.util.IntervalUtils
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
Expand Down Expand Up @@ -91,6 +92,7 @@ case class SessionWindow(timeColumn: Expression, gapDuration: Expression) extend
override def dataType: DataType = new StructType()
.add(StructField("start", children.head.dataType))
.add(StructField("end", children.head.dataType))
final override val nodePatterns: Seq[TreePattern] = Seq(SESSION_WINDOW)

// This expression is replaced in the analyzer.
override lazy val resolved = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ object RuleIdCollection {
"org.apache.spark.sql.catalyst.analysis.ResolveOrderByAll" ::
"org.apache.spark.sql.catalyst.analysis.ResolveTimeZone" ::
"org.apache.spark.sql.catalyst.analysis.ResolveUnion" ::
"org.apache.spark.sql.catalyst.analysis.SessionWindowing" ::
"org.apache.spark.sql.catalyst.analysis.SubstituteUnresolvedOrdinals" ::
"org.apache.spark.sql.catalyst.analysis.TimeWindowing" ::
"org.apache.spark.sql.catalyst.analysis.TypeCoercionBase$CombinedTypeCoercionRule" ::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ object TreePattern extends Enumeration {
val SCALAR_SUBQUERY: Value = Value
val SCALAR_SUBQUERY_REFERENCE: Value = Value
val SCALA_UDF: Value = Value
val SESSION_WINDOW: Value = Value
val SORT: Value = Value
val SUBQUERY_ALIAS: Value = Value
val SUM: Value = Value
Expand Down

0 comments on commit d280684

Please sign in to comment.