Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pre-validate duplicate columns in materialized view query #359

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,22 @@ object FlintSparkMaterializedView {
this
}

override protected def validateIndex(index: FlintSparkIndex): FlintSparkIndex = {
/*
* Validate if duplicate column names in the output schema.
* MV query may be empty in the case of ALTER index statement.
*/
if (query.nonEmpty) {
val outputColNames = flint.spark.sql(query).schema.map(_.name)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just curious, this will infer the returned schema instead of actually run it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, for DQL, flint.spark.sql(query).show will trigger execution

require(
outputColNames.distinct.length == outputColNames.length,
"Duplicate columns found in materialized view query output")
}

// Continue to perform any additional index validation
super.validateIndex(index)
}

override protected def buildIndex(): FlintSparkIndex = {
// TODO: change here and FlintDS class to support complex field type in future
val outputSchema = flint.spark
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import org.json4s.native.Serialization
import org.opensearch.flint.core.FlintOptions
import org.opensearch.flint.core.storage.FlintOpenSearchClient
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.getFlintIndexName
import org.scalatest.matchers.must.Matchers.defined
import org.scalatest.matchers.must.Matchers.{defined, have}
import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, the}

import org.apache.spark.sql.Row
Expand Down Expand Up @@ -251,6 +251,26 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite {
metadata.indexedColumns.map(_.asScala("columnName")) shouldBe Seq("start.time", "count")
}

Seq(
s"SELECT name, name FROM $testTable",
s"SELECT name AS dup_col, age AS dup_col FROM $testTable")
.foreach { query =>
test(s"should fail to create materialized view if duplicate columns in $query") {
the[IllegalArgumentException] thrownBy {
withTempDir { checkpointDir =>
sql(s"""
| CREATE MATERIALIZED VIEW $testMvName
| AS $query
| WITH (
| auto_refresh = true,
| checkpoint_location = '${checkpointDir.getAbsolutePath}'
| )
|""".stripMargin)
}
} should have message "requirement failed: Duplicate columns found in materialized view query output"
}
}

test("show all materialized views in catalog and database") {
// Show in catalog
flint.materializedView().name("spark_catalog.default.mv1").query(testQuery).create()
Expand Down
Loading