Skip to content

Commit

Permalink
[SPARK-40892][SQL][SS] Loosen the requirement of window_time rule - a…
Browse files Browse the repository at this point in the history
…llow multiple window_time calls

### What changes were proposed in this pull request?

This PR proposes to loosen the requirement of window_time rule to allow multiple distinct window_time calls. After this change, users can call the window_time function with different windows in the same logical node (select, where, etc.).

Given that we allow multiple calls of window_time in projection, we no longer be able to use the reserved column name "window_time". This PR picked up the SQL representation of the WindowTime, to distinguish each distinct function call.
(This is different from time window/session window, but "arguably" saying, they are incorrect. Just that we can't fix them now since the change would incur backward incompatibility...)

### Why are the changes needed?

The rule for window time followed the existing rules of time window / session window which only allows a single function call in a same projection (strictly saying, it considers the call of function as once if the function is called with same parameters).

For time window/session window rules , the restriction makes sense since allowing this would produce cartesian product of rows (although Spark can handle it). But given that window_time only produces one value, the restriction no longer makes sense.

### Does this PR introduce _any_ user-facing change?

Yes since it changes the resulting column name from window_time function call, but the function is not released yet.

### How was this patch tested?

New test case.

Closes apache#38361 from HeartSaVioR/SPARK-40892.

Authored-by: Jungtaek Lim <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
  • Loading branch information
HeartSaVioR committed Oct 28, 2022
1 parent 5793e5f commit 39e8359
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -292,53 +292,59 @@ object ResolveWindowTime extends Rule[LogicalPlan] {
val windowTimeExpressions =
p.expressions.flatMap(_.collect { case w: WindowTime => w }).toSet

if (windowTimeExpressions.size == 1 &&
windowTimeExpressions.head.windowColumn.resolved &&
windowTimeExpressions.head.checkInputDataTypes().isSuccess) {
val allWindowTimeExprsResolved = windowTimeExpressions.forall { w =>
w.windowColumn.resolved && w.checkInputDataTypes().isSuccess
}

val windowTime = windowTimeExpressions.head
if (windowTimeExpressions.nonEmpty && allWindowTimeExprsResolved) {
val windowTimeToAttrAndNewColumn = windowTimeExpressions.map { windowTime =>
val metadata = windowTime.windowColumn match {
case a: Attribute => a.metadata
case _ => Metadata.empty
}

val metadata = windowTime.windowColumn match {
case a: Attribute => a.metadata
case _ => Metadata.empty
}
if (!metadata.contains(TimeWindow.marker) &&
!metadata.contains(SessionWindow.marker)) {
// FIXME: error framework?
throw new AnalysisException(
"The input is not a correct window column: $windowTime", plan = Some(p))
}

if (!metadata.contains(TimeWindow.marker) &&
!metadata.contains(SessionWindow.marker)) {
// FIXME: error framework?
throw new AnalysisException(
"The input is not a correct window column: $windowTime", plan = Some(p))
}
val newMetadata = new MetadataBuilder()
.withMetadata(metadata)
.remove(TimeWindow.marker)
.remove(SessionWindow.marker)
.build()

val newMetadata = new MetadataBuilder()
.withMetadata(metadata)
.remove(TimeWindow.marker)
.remove(SessionWindow.marker)
.build()
val colName = windowTime.sql

val attr = AttributeReference(colName, windowTime.dataType, metadata = newMetadata)()

val attr = AttributeReference(
"window_time", windowTime.dataType, metadata = newMetadata)()
// NOTE: "window.end" is "exclusive" upper bound of window, so if we use this value as
// it is, it is going to be bound to the different window even if we apply the same window
// spec. Decrease 1 microsecond from window.end to let the window_time be bound to the
// correct window range.
val subtractExpr =
PreciseTimestampConversion(
Subtract(PreciseTimestampConversion(
GetStructField(windowTime.windowColumn, 1),
windowTime.dataType, LongType), Literal(1L)),
LongType,
windowTime.dataType)

// NOTE: "window.end" is "exclusive" upper bound of window, so if we use this value as
// it is, it is going to be bound to the different window even if we apply the same window
// spec. Decrease 1 microsecond from window.end to let the window_time be bound to the
// correct window range.
val subtractExpr =
PreciseTimestampConversion(
Subtract(PreciseTimestampConversion(
GetStructField(windowTime.windowColumn, 1),
windowTime.dataType, LongType), Literal(1L)),
LongType,
windowTime.dataType)
val newColumn = Alias(subtractExpr, colName)(
exprId = attr.exprId, explicitMetadata = Some(newMetadata))

val newColumn = Alias(subtractExpr, "window_time")(
exprId = attr.exprId, explicitMetadata = Some(newMetadata))
windowTime -> (attr, newColumn)
}.toMap

val replacedPlan = p transformExpressions {
case w: WindowTime => attr
case w: WindowTime => windowTimeToAttrAndNewColumn(w)._1
}

replacedPlan.withNewChildren(Project(newColumn +: child.output, child) :: Nil)
val newColumnsToAdd = windowTimeToAttrAndNewColumn.values.map(_._2)
replacedPlan.withNewChildren(
Project(newColumnsToAdd ++: child.output, child) :: Nil)
} else {
p // Return unchanged. Analyzer will throw exception later
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@
| org.apache.spark.sql.catalyst.expressions.WeekDay | weekday | SELECT weekday('2009-07-30') | struct<weekday(2009-07-30):int> |
| org.apache.spark.sql.catalyst.expressions.WeekOfYear | weekofyear | SELECT weekofyear('2008-02-20') | struct<weekofyear(2008-02-20):int> |
| org.apache.spark.sql.catalyst.expressions.WidthBucket | width_bucket | SELECT width_bucket(5.3, 0.2, 10.6, 5) | struct<width_bucket(5.3, 0.2, 10.6, 5):bigint> |
| org.apache.spark.sql.catalyst.expressions.WindowTime | window_time | SELECT a, window.start as start, window.end as end, window_time(window), cnt FROM (SELECT a, window, count(*) as cnt FROM VALUES ('A1', '2021-01-01 00:00:00'), ('A1', '2021-01-01 00:04:30'), ('A1', '2021-01-01 00:06:00'), ('A2', '2021-01-01 00:01:00') AS tab(a, b) GROUP by a, window(b, '5 minutes') ORDER BY a, window.start) | struct<a:string,start:timestamp,end:timestamp,window_time:timestamp,cnt:bigint> |
| org.apache.spark.sql.catalyst.expressions.WindowTime | window_time | SELECT a, window.start as start, window.end as end, window_time(window), cnt FROM (SELECT a, window, count(*) as cnt FROM VALUES ('A1', '2021-01-01 00:00:00'), ('A1', '2021-01-01 00:04:30'), ('A1', '2021-01-01 00:06:00'), ('A2', '2021-01-01 00:01:00') AS tab(a, b) GROUP by a, window(b, '5 minutes') ORDER BY a, window.start) | struct<a:string,start:timestamp,end:timestamp,window_time(window):timestamp,cnt:bigint> |
| org.apache.spark.sql.catalyst.expressions.XxHash64 | xxhash64 | SELECT xxhash64('Spark', array(123), 2) | struct<xxhash64(Spark, array(123), 2):bigint> |
| org.apache.spark.sql.catalyst.expressions.Year | year | SELECT year('2016-07-30') | struct<year(2016-07-30):int> |
| org.apache.spark.sql.catalyst.expressions.ZipWith | zip_with | SELECT zip_with(array(1, 2, 3), array('a', 'b', 'c'), (x, y) -> (y, x)) | struct<zip_with(array(1, 2, 3), array(a, b, c), lambdafunction(named_struct(y, namedlambdavariable(), x, namedlambdavariable()), namedlambdavariable(), namedlambdavariable())):array<struct<y:string,x:int>>> |
Expand Down Expand Up @@ -413,4 +413,4 @@
| org.apache.spark.sql.catalyst.expressions.xml.XPathList | xpath | SELECT xpath('<a><b>b1</b><b>b2</b><b>b3</b><c>c1</c><c>c2</c></a>','a/b/text()') | struct<xpath(<a><b>b1</b><b>b2</b><b>b3</b><c>c1</c><c>c2</c></a>, a/b/text()):array<string>> |
| org.apache.spark.sql.catalyst.expressions.xml.XPathLong | xpath_long | SELECT xpath_long('<a><b>1</b><b>2</b></a>', 'sum(a/b)') | struct<xpath_long(<a><b>1</b><b>2</b></a>, sum(a/b)):bigint> |
| org.apache.spark.sql.catalyst.expressions.xml.XPathShort | xpath_short | SELECT xpath_short('<a><b>1</b><b>2</b></a>', 'sum(a/b)') | struct<xpath_short(<a><b>1</b><b>2</b></a>, sum(a/b)):smallint> |
| org.apache.spark.sql.catalyst.expressions.xml.XPathString | xpath_string | SELECT xpath_string('<a><b>b</b><c>cc</c></a>','a/c') | struct<xpath_string(<a><b>b</b><c>cc</c></a>, a/c):string> |
| org.apache.spark.sql.catalyst.expressions.xml.XPathString | xpath_string | SELECT xpath_string('<a><b>b</b><c>cc</c></a>','a/c') | struct<xpath_string(<a><b>b</b><c>cc</c></a>, a/c):string> |
Original file line number Diff line number Diff line change
Expand Up @@ -599,23 +599,37 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession {
("2016-03-27 19:38:18"), ("2016-03-27 19:39:25")
).toDF("time")

val e = intercept[AnalysisException] {
df
.withColumn("time2", expr("time - INTERVAL 5 minutes"))
.select(
window($"time", "10 seconds").as("window1"),
window($"time2", "10 seconds").as("window2")
)
.select(
$"window1.end".cast("string"),
window_time($"window1").cast("string"),
$"window2.end".cast("string"),
window_time($"window2").cast("string")
)
}
assert(e.getMessage.contains(
"Multiple time/session window expressions would result in a cartesian product of rows, " +
"therefore they are currently not supported"))
val df2 = df
.withColumn("time2", expr("time - INTERVAL 15 minutes"))
.select(window($"time", "10 seconds").as("window1"), $"time2")
.select($"window1", window($"time2", "10 seconds").as("window2"))

checkAnswer(
df2.select(
$"window1.end".cast("string"),
window_time($"window1").cast("string"),
$"window2.end".cast("string"),
window_time($"window2").cast("string")),
Seq(
Row("2016-03-27 19:38:20", "2016-03-27 19:38:19.999999",
"2016-03-27 19:23:20", "2016-03-27 19:23:19.999999"),
Row("2016-03-27 19:39:30", "2016-03-27 19:39:29.999999",
"2016-03-27 19:24:30", "2016-03-27 19:24:29.999999"))
)

// check column names
val df3 = df2
.select(
window_time($"window1").cast("string"),
window_time($"window2").cast("string"),
window_time($"window2").as("wt2_aliased").cast("string")
)

val schema = df3.schema

assert(schema.fields.exists(_.name == "window_time(window1)"))
assert(schema.fields.exists(_.name == "window_time(window2)"))
assert(schema.fields.exists(_.name == "wt2_aliased"))
}

test("window_time function on agg output") {
Expand Down

0 comments on commit 39e8359

Please sign in to comment.