From 711e1a5cc269a2f39ff6384e8be780338a7d5943 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Mon, 23 Oct 2023 14:57:26 -0700 Subject: [PATCH 1/2] Add IT to verify identifier is unquoted properly Signed-off-by: Chen Dai --- .../FlintSparkCoveringIndexSqlITSuite.scala | 17 ++++++++++++- ...FlintSparkMaterializedViewSqlITSuite.scala | 24 ++++++++++++++++++- .../FlintSparkSkippingIndexSqlITSuite.scala | 20 +++++++++++++++- 3 files changed, 58 insertions(+), 3 deletions(-) diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala index 0d3f7a887..2d75de8cc 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala @@ -6,7 +6,7 @@ package org.opensearch.flint.spark import scala.Option.empty -import scala.collection.JavaConverters.mapAsJavaMapConverter +import scala.collection.JavaConverters.{mapAsJavaMapConverter, mapAsScalaMapConverter} import org.json4s.{Formats, NoTypeHints} import org.json4s.native.JsonMethods.parse @@ -179,6 +179,21 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { |""".stripMargin) } + test("create skipping index with quoted index, table and column name") { + sql(s""" + | CREATE INDEX `$testIndex` ON `spark_catalog`.`default`.`covering_sql_test` + | (`name`, `age`) + | """.stripMargin) + + val index = flint.describeIndex(testFlintIndex) + index shouldBe defined + + val metadata = index.get.metadata() + metadata.name shouldBe testIndex + metadata.source shouldBe testTable + metadata.indexedColumns.map(_.asScala("columnName")) shouldBe Seq("name", "age") + } + test("show all covering index on the source table") { flint .coveringIndex() diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala index 15cd6443e..f956920fa 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala @@ -8,7 +8,7 @@ package org.opensearch.flint.spark import java.sql.Timestamp import scala.Option.empty -import scala.collection.JavaConverters.mapAsJavaMapConverter +import scala.collection.JavaConverters.{mapAsJavaMapConverter, mapAsScalaMapConverter} import org.json4s.{Formats, NoTypeHints} import org.json4s.native.JsonMethods.parse @@ -139,6 +139,28 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { sql(s"CREATE MATERIALIZED VIEW IF NOT EXISTS $testMvName AS $testQuery") } + test("create materialized view with quoted name and column name") { + val testQuotedQuery = + """ SELECT + | window.start AS `startTime`, + | COUNT(*) AS `count` + | FROM `spark_catalog`.`default`.`mv_test` + | GROUP BY TUMBLE(`time`, '10 Minutes')""".stripMargin.trim + + sql(s""" + | CREATE MATERIALIZED VIEW `spark_catalog`.`default`.`mv_test_metrics` + | AS $testQuotedQuery + |""".stripMargin) + + val index = flint.describeIndex(testFlintIndex) + index shouldBe defined + + val metadata = index.get.metadata() + metadata.name shouldBe testMvName + metadata.source shouldBe testQuotedQuery + metadata.indexedColumns.map(_.asScala("columnName")) shouldBe Seq("startTime", "count") + } + test("show all materialized views in catalog and database") { // Show in catalog flint.materializedView().name("spark_catalog.default.mv1").query(testQuery).create() diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala index dbd349b63..343f304c3 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala @@ -6,7 +6,7 @@ package org.opensearch.flint.spark import scala.Option.empty -import scala.collection.JavaConverters.mapAsJavaMapConverter +import scala.collection.JavaConverters.{mapAsJavaMapConverter, mapAsScalaMapConverter} import org.json4s.{Formats, NoTypeHints} import org.json4s.native.JsonMethods.parse @@ -150,6 +150,24 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite { | """.stripMargin) } + test("create skipping index with quoted table and column name") { + sql(s""" + | CREATE SKIPPING INDEX ON `spark_catalog`.`default`.`skipping_sql_test` + | ( + | `year` PARTITION, + | `name` VALUE_SET, + | `age` MIN_MAX + | ) + | """.stripMargin) + + val index = flint.describeIndex(testIndex) + index shouldBe defined + + val metadata = index.get.metadata() + metadata.source shouldBe testTable + metadata.indexedColumns.map(_.asScala("columnName")) shouldBe Seq("year", "name", "age") + } + test("describe skipping index") { flint .skippingIndex() From 00c203abf94c34d610d5101bf6f165db18186682 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Tue, 24 Oct 2023 14:07:52 -0700 Subject: [PATCH 2/2] Fix dotted column name issue with IT Signed-off-by: Chen Dai --- .../scala/org/opensearch/flint/spark/FlintSparkIndex.scala | 3 ++- .../flint/spark/FlintSparkMaterializedViewSqlITSuite.scala | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala index 0586bfc49..fe5329739 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala @@ -135,9 +135,10 @@ object FlintSparkIndex { } def generateSchemaJSON(allFieldTypes: Map[String, String]): String = { + // Backtick column names to escape special characters, otherwise fromDDL() will fail val catalogDDL = allFieldTypes - .map { case (colName, colType) => s"$colName $colType not null" } + .map { case (colName, colType) => s"`$colName` $colType not null" } .mkString(",") val structType = StructType.fromDDL(catalogDDL) diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala index f956920fa..e57a95fdb 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala @@ -142,7 +142,7 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { test("create materialized view with quoted name and column name") { val testQuotedQuery = """ SELECT - | window.start AS `startTime`, + | window.start AS `start.time`, | COUNT(*) AS `count` | FROM `spark_catalog`.`default`.`mv_test` | GROUP BY TUMBLE(`time`, '10 Minutes')""".stripMargin.trim @@ -158,7 +158,7 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { val metadata = index.get.metadata() metadata.name shouldBe testMvName metadata.source shouldBe testQuotedQuery - metadata.indexedColumns.map(_.asScala("columnName")) shouldBe Seq("startTime", "count") + metadata.indexedColumns.map(_.asScala("columnName")) shouldBe Seq("start.time", "count") } test("show all materialized views in catalog and database") {