Skip to content

Commit

Permalink
Add IT for verifying correctness of subquery workaround
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Oct 31, 2024
1 parent cac4738 commit b4db195
Showing 1 changed file with 51 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -448,10 +448,10 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite {
}
}

test("tumble function only supports simple column") {
val testTable2 = s"$catalogName.default.mv_test_tumble"
withTable(testTable2) {
createTableHttpLog(testTable2)
test("tumble function should raise error for non-simple time column") {
val httpLogs = s"$catalogName.default.mv_test_tumble"
withTable(httpLogs) {
createTableHttpLog(httpLogs)

withTempDir { checkpointDir =>
val ex = the[IllegalStateException] thrownBy {
Expand All @@ -461,7 +461,7 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite {
| SELECT
| window.start AS startTime,
| COUNT(*) AS count
| FROM $testTable2
| FROM $httpLogs
| GROUP BY
| TUMBLE(CAST(timestamp AS TIMESTAMP), '10 Minute')
| WITH (
Expand All @@ -477,5 +477,51 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite {
}
}

test("tumble function should succeed with casted time column within subquery") {
val httpLogs = s"$catalogName.default.mv_test_tumble"
withTable(httpLogs) {
createTableHttpLog(httpLogs)

withTempDir { checkpointDir =>
sql(s"""
| CREATE MATERIALIZED VIEW `$catalogName`.`default`.`mv_test_metrics`
| AS
| SELECT
| window.start AS startTime,
| COUNT(*) AS count
| FROM (
| SELECT CAST(timestamp AS TIMESTAMP) AS time
| FROM $httpLogs
| )
| GROUP BY
| TUMBLE(time, '10 Minute')
| WITH (
| auto_refresh = true,
| checkpoint_location = '${checkpointDir.getAbsolutePath}',
| watermark_delay = '1 Second'
| )
|""".stripMargin)

// Wait for streaming job complete current micro batch
val job = spark.streams.active.find(_.name == testFlintIndex)
job shouldBe defined
failAfter(streamingTimeout) {
job.get.processAllAvailable()
}

checkAnswer(
flint.queryIndex(testFlintIndex).select("startTime", "count"),
Seq(
Row(timestamp("2023-10-01 10:00:00"), 2),
Row(timestamp("2023-10-01 10:10:00"), 2)
/*
* The last row is pending to fire upon watermark
* Row(timestamp("2023-10-01 10:20:00"), 2)
*/
))
}
}
}

private def timestamp(ts: String): Timestamp = Timestamp.valueOf(ts)
}

0 comments on commit b4db195

Please sign in to comment.