From b4db195fc24b02bb75d2184d36099c5b1c0d0c70 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Thu, 31 Oct 2024 09:36:14 -0700 Subject: [PATCH] Add IT for verifying correctness of subquery workaround Signed-off-by: Chen Dai --- ...FlintSparkMaterializedViewSqlITSuite.scala | 56 +++++++++++++++++-- 1 file changed, 51 insertions(+), 5 deletions(-) diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala index 78f800c1c..acd438eca 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala @@ -448,10 +448,10 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { } } - test("tumble function only supports simple column") { - val testTable2 = s"$catalogName.default.mv_test_tumble" - withTable(testTable2) { - createTableHttpLog(testTable2) + test("tumble function should raise error for non-simple time column") { + val httpLogs = s"$catalogName.default.mv_test_tumble" + withTable(httpLogs) { + createTableHttpLog(httpLogs) withTempDir { checkpointDir => val ex = the[IllegalStateException] thrownBy { @@ -461,7 +461,7 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { | SELECT | window.start AS startTime, | COUNT(*) AS count - | FROM $testTable2 + | FROM $httpLogs | GROUP BY | TUMBLE(CAST(timestamp AS TIMESTAMP), '10 Minute') | WITH ( @@ -477,5 +477,51 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { } } + test("tumble function should succeed with casted time column within subquery") { + val httpLogs = s"$catalogName.default.mv_test_tumble" + withTable(httpLogs) { + createTableHttpLog(httpLogs) + + withTempDir { checkpointDir => + sql(s""" + | CREATE MATERIALIZED VIEW `$catalogName`.`default`.`mv_test_metrics` + | AS + | SELECT + | window.start AS startTime, + | COUNT(*) AS count + | FROM ( + | SELECT CAST(timestamp AS TIMESTAMP) AS time + | FROM $httpLogs + | ) + | GROUP BY + | TUMBLE(time, '10 Minute') + | WITH ( + | auto_refresh = true, + | checkpoint_location = '${checkpointDir.getAbsolutePath}', + | watermark_delay = '1 Second' + | ) + |""".stripMargin) + + // Wait for streaming job complete current micro batch + val job = spark.streams.active.find(_.name == testFlintIndex) + job shouldBe defined + failAfter(streamingTimeout) { + job.get.processAllAvailable() + } + + checkAnswer( + flint.queryIndex(testFlintIndex).select("startTime", "count"), + Seq( + Row(timestamp("2023-10-01 10:00:00"), 2), + Row(timestamp("2023-10-01 10:10:00"), 2) + /* + * The last row is pending to fire upon watermark + * Row(timestamp("2023-10-01 10:20:00"), 2) + */ + )) + } + } + } + private def timestamp(ts: String): Timestamp = Timestamp.valueOf(ts) }