From c5ad7e70bbaa863cde58cdb951676de28321edf4 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Thu, 6 Jun 2024 11:37:36 -0700 Subject: [PATCH] Add pre-validation for output columns in mv query (#359) Signed-off-by: Chen Dai --- .../spark/mv/FlintSparkMaterializedView.scala | 16 ++++++++++++++ ...FlintSparkMaterializedViewSqlITSuite.scala | 22 ++++++++++++++++++- 2 files changed, 37 insertions(+), 1 deletion(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala index 74626d25d..d7c6ddf81 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala @@ -196,6 +196,22 @@ object FlintSparkMaterializedView { this } + override protected def validateIndex(index: FlintSparkIndex): FlintSparkIndex = { + /* + * Validate if duplicate column names in the output schema. + * MV query may be empty in the case of ALTER index statement. + */ + if (query.nonEmpty) { + val outputColNames = flint.spark.sql(query).schema.map(_.name) + require( + outputColNames.distinct.length == outputColNames.length, + "Duplicate columns found in materialized view query output") + } + + // Continue to perform any additional index validation + super.validateIndex(index) + } + override protected def buildIndex(): FlintSparkIndex = { // TODO: change here and FlintDS class to support complex field type in future val outputSchema = flint.spark diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala index 8dfde3439..3a17cb8b1 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala @@ -16,7 +16,7 @@ import org.json4s.native.Serialization import org.opensearch.flint.core.FlintOptions import org.opensearch.flint.core.storage.FlintOpenSearchClient import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.getFlintIndexName -import org.scalatest.matchers.must.Matchers.defined +import org.scalatest.matchers.must.Matchers.{defined, have} import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, the} import org.apache.spark.sql.Row @@ -251,6 +251,26 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { metadata.indexedColumns.map(_.asScala("columnName")) shouldBe Seq("start.time", "count") } + Seq( + s"SELECT name, name FROM $testTable", + s"SELECT name AS dup_col, age AS dup_col FROM $testTable") + .foreach { query => + test(s"should fail to create materialized view if duplicate columns in $query") { + the[IllegalArgumentException] thrownBy { + withTempDir { checkpointDir => + sql(s""" + | CREATE MATERIALIZED VIEW $testMvName + | AS $query + | WITH ( + | auto_refresh = true, + | checkpoint_location = '${checkpointDir.getAbsolutePath}' + | ) + |""".stripMargin) + } + } should have message "requirement failed: Duplicate columns found in materialized view query output" + } + } + test("show all materialized views in catalog and database") { // Show in catalog flint.materializedView().name("spark_catalog.default.mv1").query(testQuery).create()