Skip to content

Commit

Permalink
Add more IT for MV validation
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Apr 19, 2024
1 parent 16a7b79 commit 7f6c7a7
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, generat
import org.opensearch.flint.spark.FlintSparkIndexOptions.empty
import org.opensearch.flint.spark.function.TumbleFunction
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.{getFlintIndexName, MV_INDEX_TYPE}
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode.{AUTO, INCREMENTAL}

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.FunctionIdentifier
Expand Down Expand Up @@ -122,10 +124,9 @@ case class FlintSparkMaterializedView(
func
}

if (winFuncs.size != 1) {
throw new IllegalStateException(
"A windowing function is required for incremental refresh with aggregation")
}
require(
winFuncs.size == 1,
"A windowing function is required for incremental refresh with aggregation")

// Assume first aggregate item must be time column
val winFunc = winFuncs.head
Expand Down Expand Up @@ -198,6 +199,13 @@ object FlintSparkMaterializedView {

override protected def validateIndex(index: FlintSparkIndex): FlintSparkIndex = {
super.validateIndex(index)

// Pre-build to ensure windowing function and watermark valid
val refreshMode = FlintSparkIndexRefresh.create(index.name(), index).refreshMode
if (refreshMode == AUTO || refreshMode == INCREMENTAL) {
index.asInstanceOf[StreamingRefresh].buildStream(flint.spark)
}
index
}

override protected def buildIndex(): FlintSparkIndex = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,16 +198,18 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite {

test("should fail if materialized view query has syntax error") {
the[ParseException] thrownBy {
// Wrong syntax due to incomplete WITH clause
sql(s"""
| CREATE MATERIALIZED VIEW $testMvName
| AS
| SELECT time FROM $testTable WITH (
| SELECT time FROM $testTable WITH
| """.stripMargin)
}
}

test("should fail if materialized view query has semantic error") {
the[AnalysisException] thrownBy {
// Non-existent time1 column
sql(s"""
| CREATE MATERIALIZED VIEW $testMvName
| AS
Expand All @@ -216,6 +218,41 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite {
}
}

test("should fail if no windowing function for aggregated query") {
withTempDir { checkpointDir =>
the[IllegalArgumentException] thrownBy {
sql(s"""
| CREATE MATERIALIZED VIEW $testMvName
| AS
| SELECT COUNT(*) FROM $testTable GROUP BY time
| WITH (
| auto_refresh = true,
| checkpoint_location = '${checkpointDir.getAbsolutePath}',
| watermark_delay = '1 Second'
| )
| """.stripMargin)
} should have message
"requirement failed: A windowing function is required for incremental refresh with aggregation"
}
}

test("should fail if no watermark delay for aggregated query") {
withTempDir { checkpointDir =>
the[IllegalArgumentException] thrownBy {
sql(s"""
| CREATE MATERIALIZED VIEW $testMvName
| AS
| $testQuery
| WITH (
| auto_refresh = true,
| checkpoint_location = '${checkpointDir.getAbsolutePath}'
| )
| """.stripMargin)
} should have message
"requirement failed: watermark delay is required for auto refresh and incremental refresh with aggregation"
}
}

test("issue 112, https://github.com/opensearch-project/opensearch-spark/issues/112") {
val tableName = "spark_catalog.default.issue112"
createTableIssue112(tableName)
Expand Down

0 comments on commit 7f6c7a7

Please sign in to comment.