Skip to content

Commit

Permalink
Add pre-validation for output columns in mv query (opensearch-project…
Browse files Browse the repository at this point in the history
…#359)

Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen authored Jun 6, 2024
1 parent 689788a commit c5ad7e7
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,22 @@ object FlintSparkMaterializedView {
this
}

override protected def validateIndex(index: FlintSparkIndex): FlintSparkIndex = {
/*
* Validate if duplicate column names in the output schema.
* MV query may be empty in the case of ALTER index statement.
*/
if (query.nonEmpty) {
val outputColNames = flint.spark.sql(query).schema.map(_.name)
require(
outputColNames.distinct.length == outputColNames.length,
"Duplicate columns found in materialized view query output")
}

// Continue to perform any additional index validation
super.validateIndex(index)
}

override protected def buildIndex(): FlintSparkIndex = {
// TODO: change here and FlintDS class to support complex field type in future
val outputSchema = flint.spark
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import org.json4s.native.Serialization
import org.opensearch.flint.core.FlintOptions
import org.opensearch.flint.core.storage.FlintOpenSearchClient
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.getFlintIndexName
import org.scalatest.matchers.must.Matchers.defined
import org.scalatest.matchers.must.Matchers.{defined, have}
import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, the}

import org.apache.spark.sql.Row
Expand Down Expand Up @@ -251,6 +251,26 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite {
metadata.indexedColumns.map(_.asScala("columnName")) shouldBe Seq("start.time", "count")
}

Seq(
s"SELECT name, name FROM $testTable",
s"SELECT name AS dup_col, age AS dup_col FROM $testTable")
.foreach { query =>
test(s"should fail to create materialized view if duplicate columns in $query") {
the[IllegalArgumentException] thrownBy {
withTempDir { checkpointDir =>
sql(s"""
| CREATE MATERIALIZED VIEW $testMvName
| AS $query
| WITH (
| auto_refresh = true,
| checkpoint_location = '${checkpointDir.getAbsolutePath}'
| )
|""".stripMargin)
}
} should have message "requirement failed: Duplicate columns found in materialized view query output"
}
}

test("show all materialized views in catalog and database") {
// Show in catalog
flint.materializedView().name("spark_catalog.default.mv1").query(testQuery).create()
Expand Down

0 comments on commit c5ad7e7

Please sign in to comment.