Skip to content

Commit

Permalink
Add validation for time column in tumble function (#858)
Browse files Browse the repository at this point in the history
* Validate tumble function argument and add IT

Signed-off-by: Chen Dai <[email protected]>

* Add IT for verifying correctness of subquery workaround

Signed-off-by: Chen Dai <[email protected]>

* Modify error message wording

Signed-off-by: Chen Dai <[email protected]>

---------

Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen authored Nov 7, 2024
1 parent 31d04f1 commit 182689c
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,14 @@ case class FlintSparkMaterializedView(

// Assume first aggregate item must be time column
val winFunc = winFuncs.head
val timeCol = winFunc.arguments.head.asInstanceOf[Attribute]
Some(agg, timeCol)
val timeCol = winFunc.arguments.head
timeCol match {
case attr: Attribute =>
Some(agg, attr)
case _ =>
throw new IllegalArgumentException(
s"Tumble function only supports simple timestamp column, but found: $timeCol")
}
}

private def isWindowingFunction(func: UnresolvedFunction): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,5 +448,80 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite {
}
}

test("tumble function should raise error for non-simple time column") {
val httpLogs = s"$catalogName.default.mv_test_tumble"
withTable(httpLogs) {
createTableHttpLog(httpLogs)

withTempDir { checkpointDir =>
val ex = the[IllegalStateException] thrownBy {
sql(s"""
| CREATE MATERIALIZED VIEW `$catalogName`.`default`.`mv_test_metrics`
| AS
| SELECT
| window.start AS startTime,
| COUNT(*) AS count
| FROM $httpLogs
| GROUP BY
| TUMBLE(CAST(timestamp AS TIMESTAMP), '10 Minute')
| WITH (
| auto_refresh = true,
| checkpoint_location = '${checkpointDir.getAbsolutePath}',
| watermark_delay = '1 Second'
| )
|""".stripMargin)
}
ex.getCause should have message
"Tumble function only supports simple timestamp column, but found: cast('timestamp as timestamp)"
}
}
}

test("tumble function should succeed with casted time column within subquery") {
val httpLogs = s"$catalogName.default.mv_test_tumble"
withTable(httpLogs) {
createTableHttpLog(httpLogs)

withTempDir { checkpointDir =>
sql(s"""
| CREATE MATERIALIZED VIEW `$catalogName`.`default`.`mv_test_metrics`
| AS
| SELECT
| window.start AS startTime,
| COUNT(*) AS count
| FROM (
| SELECT CAST(timestamp AS TIMESTAMP) AS time
| FROM $httpLogs
| )
| GROUP BY
| TUMBLE(time, '10 Minute')
| WITH (
| auto_refresh = true,
| checkpoint_location = '${checkpointDir.getAbsolutePath}',
| watermark_delay = '1 Second'
| )
|""".stripMargin)

// Wait for streaming job complete current micro batch
val job = spark.streams.active.find(_.name == testFlintIndex)
job shouldBe defined
failAfter(streamingTimeout) {
job.get.processAllAvailable()
}

checkAnswer(
flint.queryIndex(testFlintIndex).select("startTime", "count"),
Seq(
Row(timestamp("2023-10-01 10:00:00"), 2),
Row(timestamp("2023-10-01 10:10:00"), 2)
/*
* The last row is pending to fire upon watermark
* Row(timestamp("2023-10-01 10:20:00"), 2)
*/
))
}
}
}

private def timestamp(ts: String): Timestamp = Timestamp.valueOf(ts)
}

0 comments on commit 182689c

Please sign in to comment.