From b52c75f79747a708cbfdce2286a18a686a59588e Mon Sep 17 00:00:00 2001 From: Adi Suresh Date: Fri, 5 Apr 2024 03:25:30 +0000 Subject: [PATCH] Run iceberg tests as a separate suite. Not all tests need to run as iceberg. Instead only run the SQL tests as iceberg in addition to running them as CSV to test base behavior. Signed-off-by: Adi Suresh --- .../FlintSparkSkippingIndexITSuite.scala | 32 ------------- .../flint/spark/FlintSparkSuite.scala | 31 +------------ ...lintSparkIcebergCoveringIndexITSuite.scala | 12 +++++ ...tSparkIcebergMaterializedViewITSuite.scala | 12 +++++ ...lintSparkIcebergSkippingIndexITSuite.scala | 12 +++++ .../iceberg/FlintSparkIcebergSuite.scala | 45 +++++++++++++++++++ .../flint/spark/ppl/FlintPPLSuite.scala | 6 +-- 7 files changed, 84 insertions(+), 66 deletions(-) create mode 100644 integ-test/src/test/scala/org/opensearch/flint/spark/iceberg/FlintSparkIcebergCoveringIndexITSuite.scala create mode 100644 integ-test/src/test/scala/org/opensearch/flint/spark/iceberg/FlintSparkIcebergMaterializedViewITSuite.scala create mode 100644 integ-test/src/test/scala/org/opensearch/flint/spark/iceberg/FlintSparkIcebergSkippingIndexITSuite.scala create mode 100644 integ-test/src/test/scala/org/opensearch/flint/spark/iceberg/FlintSparkIcebergSuite.scala diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala index d3e915760..999fb3008 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala @@ -322,10 +322,6 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { } test("should not rewrite original query if no skipping index") { - assume( - System.getProperty("TABLE_TYPE") != "iceberg", - """Test disabled for iceberg because Iceberg tables have a built-in - skipping index which rewrites queries""") val query = s""" | SELECT name @@ -341,10 +337,6 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { } test("should not rewrite original query if skipping index is logically deleted") { - assume( - System.getProperty("TABLE_TYPE") != "iceberg", - """Test disabled for Iceberg because Iceberg tables have a built-in - skipping index which rewrites queries""") flint .skippingIndex() .onTable(testTable) @@ -367,10 +359,6 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { } test("can build partition skipping index and rewrite applicable query") { - assume( - System.getProperty("TABLE_TYPE") != "iceberg", - """Test disabled for Iceberg because Iceberg tables have a built-in - skipping index which rewrites queries""") flint .skippingIndex() .onTable(testTable) @@ -396,10 +384,6 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { } test("can build value set skipping index and rewrite applicable query") { - assume( - System.getProperty("TABLE_TYPE") != "iceberg", - """Test disabled for Iceberg because Iceberg tables have a built-in - skipping index which rewrites queries""") flint .skippingIndex() .onTable(testTable) @@ -429,10 +413,6 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { } test("can build min max skipping index and rewrite applicable query") { - assume( - System.getProperty("TABLE_TYPE") != "iceberg", - """Test disabled for Iceberg because Iceberg tables have a built-in - skipping index which rewrites queries""") flint .skippingIndex() .onTable(testTable) @@ -459,10 +439,6 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { } test("can build bloom filter skipping index and rewrite applicable query") { - assume( - System.getProperty("TABLE_TYPE") != "iceberg", - """Test disabled for Iceberg because Iceberg tables have a built-in - skipping index which rewrites queries""") flint .skippingIndex() .onTable(testTable) @@ -494,10 +470,6 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { } test("should rewrite applicable query with table name without database specified") { - assume( - System.getProperty("TABLE_TYPE") != "iceberg", - """Test disabled for Iceberg because Iceberg tables have a built-in - skipping index which rewrites queries""") flint .skippingIndex() .onTable(testTable) @@ -516,10 +488,6 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { } test("should not rewrite original query if filtering condition has disjunction") { - assume( - System.getProperty("TABLE_TYPE") != "iceberg", - """Test disabled for Iceberg because Iceberg tables have a built-in - skipping index which rewrites queries""") flint .skippingIndex() .onTable(testTable) diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala index 3e0fdee9f..0c6282bb6 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala @@ -13,7 +13,6 @@ import scala.collection.immutable.Map import scala.concurrent.duration.TimeUnit import scala.util.Try -import org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions import org.mockito.ArgumentMatchers.any import org.mockito.Mockito.when import org.mockito.invocation.InvocationOnMock @@ -30,13 +29,6 @@ import org.apache.spark.sql.QueryTest import org.apache.spark.sql.flint.config.FlintSparkConf.{CHECKPOINT_MANDATORY, HOST_ENDPOINT, HOST_PORT, REFRESH_POLICY} import org.apache.spark.sql.streaming.StreamTest -// Companion object for the MyTestSuite class -object TableOptions { - // Define the map here - val opts: Map[String, String] = - Map("csv" -> "OPTIONS (header 'false', delimiter '\t')", "iceberg" -> "") -} - /** * Flint Spark suite trait that initializes [[FlintSpark]] API instance. */ @@ -44,30 +36,16 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit /** Flint Spark high level API being tested */ lazy protected val flint: FlintSpark = new FlintSpark(spark) - lazy protected val tableType: String = Option(System.getProperty("TABLE_TYPE")).getOrElse("CSV") - lazy protected val tableOptions: String = - TableOptions.opts.getOrElse(tableType.toLowerCase(), "") + lazy protected val tableType: String = "CSV" + lazy protected val tableOptions: String = "OPTIONS (header 'false', delimiter '\t')" override protected def sparkConf: SparkConf = { val conf = super.sparkConf - .set("spark.sql.catalog.spark_catalog.type", "hadoop") - .set("spark.sql.catalog.spark_catalog.warehouse", s"spark-warehouse/${suiteName}") .set(HOST_ENDPOINT.key, openSearchHost) .set(HOST_PORT.key, openSearchPort.toString) .set(REFRESH_POLICY.key, "true") // Disable mandatory checkpoint for test convenience .set(CHECKPOINT_MANDATORY.key, "false") - - if (tableType.equalsIgnoreCase("iceberg")) { - conf - .set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") - .set( - "spark.sql.extensions", - List( - classOf[IcebergSparkSessionExtensions].getName, - classOf[FlintSparkExtensions].getName) - .mkString(", ")) - } conf } @@ -82,11 +60,6 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit FlintSparkIndexMonitor.executor = mockExecutor } - override def afterAll(): Unit = { - deleteDirectory(s"spark-warehouse/${suiteName}") - super.afterAll() - } - protected def deleteTestIndex(testIndexNames: String*): Unit = { testIndexNames.foreach(testIndex => { diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/iceberg/FlintSparkIcebergCoveringIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/iceberg/FlintSparkIcebergCoveringIndexITSuite.scala new file mode 100644 index 000000000..2675ef0cd --- /dev/null +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/iceberg/FlintSparkIcebergCoveringIndexITSuite.scala @@ -0,0 +1,12 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.iceberg + +import org.opensearch.flint.spark.FlintSparkCoveringIndexSqlITSuite + +class FlintSparkIcebergCoveringIndexITSuite + extends FlintSparkCoveringIndexSqlITSuite + with FlintSparkIcebergSuite {} diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/iceberg/FlintSparkIcebergMaterializedViewITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/iceberg/FlintSparkIcebergMaterializedViewITSuite.scala new file mode 100644 index 000000000..ffb8a7d1b --- /dev/null +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/iceberg/FlintSparkIcebergMaterializedViewITSuite.scala @@ -0,0 +1,12 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.iceberg + +import org.opensearch.flint.spark.FlintSparkMaterializedViewSqlITSuite + +class FlintSparkIcebergMaterializedViewITSuite + extends FlintSparkMaterializedViewSqlITSuite + with FlintSparkIcebergSuite {} diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/iceberg/FlintSparkIcebergSkippingIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/iceberg/FlintSparkIcebergSkippingIndexITSuite.scala new file mode 100644 index 000000000..ba24e3b2b --- /dev/null +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/iceberg/FlintSparkIcebergSkippingIndexITSuite.scala @@ -0,0 +1,12 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.iceberg + +import org.opensearch.flint.spark.FlintSparkSkippingIndexSqlITSuite + +class FlintSparkIcebergSkippingIndexITSuite + extends FlintSparkSkippingIndexSqlITSuite + with FlintSparkIcebergSuite {} diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/iceberg/FlintSparkIcebergSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/iceberg/FlintSparkIcebergSuite.scala new file mode 100644 index 000000000..2ae0d157a --- /dev/null +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/iceberg/FlintSparkIcebergSuite.scala @@ -0,0 +1,45 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.iceberg + +import org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions +import org.opensearch.flint.spark.FlintSparkExtensions +import org.opensearch.flint.spark.FlintSparkSuite + +import org.apache.spark.SparkConf + +/** + * Flint Spark suite tailored for Iceberg. + */ +trait FlintSparkIcebergSuite extends FlintSparkSuite { + + // Override table type to Iceberg for this suite + override lazy protected val tableType: String = "iceberg" + + // You can also override tableOptions if Iceberg requires different options + override lazy protected val tableOptions: String = "" + + // Override the sparkConf method to include Iceberg-specific configurations + override protected def sparkConf: SparkConf = { + val conf = super.sparkConf + // Set Iceberg-specific Spark configurations + .set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") + .set("spark.sql.catalog.spark_catalog.type", "hadoop") + .set("spark.sql.catalog.spark_catalog.warehouse", s"spark-warehouse/${suiteName}") + .set( + "spark.sql.extensions", + List( + classOf[IcebergSparkSessionExtensions].getName, + classOf[FlintSparkExtensions].getName).mkString(", ")) + conf + } + + override def afterAll(): Unit = { + deleteDirectory(s"spark-warehouse/${suiteName}") + super.afterAll() + } + +} diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintPPLSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintPPLSuite.scala index 7bae3173d..1ece33ce1 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintPPLSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintPPLSuite.scala @@ -5,7 +5,6 @@ package org.opensearch.flint.spark.ppl -import org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions import org.opensearch.flint.spark.{FlintPPLSparkExtensions, FlintSparkExtensions, FlintSparkSuite} import org.apache.spark.SparkConf @@ -20,10 +19,7 @@ trait FlintPPLSuite extends FlintSparkSuite { val conf = super.sparkConf .set( "spark.sql.extensions", - List( - classOf[IcebergSparkSessionExtensions].getName, - classOf[FlintPPLSparkExtensions].getName, - classOf[FlintSparkExtensions].getName) + List(classOf[FlintPPLSparkExtensions].getName, classOf[FlintSparkExtensions].getName) .mkString(", ")) .set(OPTIMIZER_RULE_ENABLED.key, "false") conf