Skip to content

Commit

Permalink
Run iceberg tests as a separate suite.
Browse files Browse the repository at this point in the history
Not all tests need to run as iceberg. Instead only run the SQL tests as
iceberg in addition to running them as CSV to test base behavior.

Signed-off-by: Adi Suresh <[email protected]>
  • Loading branch information
asuresh8 committed Apr 5, 2024
1 parent 3423edd commit 2f4abe5
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -321,10 +321,6 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
}

test("should not rewrite original query if no skipping index") {
assume(
System.getProperty("TABLE_TYPE") != "iceberg",
"""Test disabled for iceberg because Iceberg tables have a built-in
skipping index which rewrites queries""")
val query =
s"""
| SELECT name
Expand All @@ -340,10 +336,6 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
}

test("should not rewrite original query if skipping index is logically deleted") {
assume(
System.getProperty("TABLE_TYPE") != "iceberg",
"""Test disabled for Iceberg because Iceberg tables have a built-in
skipping index which rewrites queries""")
flint
.skippingIndex()
.onTable(testTable)
Expand All @@ -366,10 +358,6 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
}

test("can build partition skipping index and rewrite applicable query") {
assume(
System.getProperty("TABLE_TYPE") != "iceberg",
"""Test disabled for Iceberg because Iceberg tables have a built-in
skipping index which rewrites queries""")
flint
.skippingIndex()
.onTable(testTable)
Expand All @@ -395,10 +383,6 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
}

test("can build value set skipping index and rewrite applicable query") {
assume(
System.getProperty("TABLE_TYPE") != "iceberg",
"""Test disabled for Iceberg because Iceberg tables have a built-in
skipping index which rewrites queries""")
flint
.skippingIndex()
.onTable(testTable)
Expand Down Expand Up @@ -428,10 +412,6 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
}

test("can build min max skipping index and rewrite applicable query") {
assume(
System.getProperty("TABLE_TYPE") != "iceberg",
"""Test disabled for Iceberg because Iceberg tables have a built-in
skipping index which rewrites queries""")
flint
.skippingIndex()
.onTable(testTable)
Expand All @@ -458,10 +438,6 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
}

test("can build bloom filter skipping index and rewrite applicable query") {
assume(
System.getProperty("TABLE_TYPE") != "iceberg",
"""Test disabled for Iceberg because Iceberg tables have a built-in
skipping index which rewrites queries""")
flint
.skippingIndex()
.onTable(testTable)
Expand Down Expand Up @@ -493,10 +469,6 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
}

test("should rewrite applicable query with table name without database specified") {
assume(
System.getProperty("TABLE_TYPE") != "iceberg",
"""Test disabled for Iceberg because Iceberg tables have a built-in
skipping index which rewrites queries""")
flint
.skippingIndex()
.onTable(testTable)
Expand All @@ -515,10 +487,6 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
}

test("should not rewrite original query if filtering condition has disjunction") {
assume(
System.getProperty("TABLE_TYPE") != "iceberg",
"""Test disabled for Iceberg because Iceberg tables have a built-in
skipping index which rewrites queries""")
flint
.skippingIndex()
.onTable(testTable)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import scala.collection.immutable.Map
import scala.concurrent.duration.TimeUnit
import scala.util.Try

import org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.when
import org.mockito.invocation.InvocationOnMock
Expand All @@ -30,44 +29,23 @@ import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.flint.config.FlintSparkConf.{CHECKPOINT_MANDATORY, HOST_ENDPOINT, HOST_PORT, REFRESH_POLICY}
import org.apache.spark.sql.streaming.StreamTest

// Companion object for the MyTestSuite class
object TableOptions {
// Define the map here
val opts: Map[String, String] =
Map("csv" -> "OPTIONS (header 'false', delimiter '\t')", "iceberg" -> "")
}

/**
* Flint Spark suite trait that initializes [[FlintSpark]] API instance.
*/
trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite with StreamTest {

/** Flint Spark high level API being tested */
lazy protected val flint: FlintSpark = new FlintSpark(spark)
lazy protected val tableType: String = Option(System.getProperty("TABLE_TYPE")).getOrElse("CSV")
lazy protected val tableOptions: String =
TableOptions.opts.getOrElse(tableType.toLowerCase(), "")
lazy protected val tableType: String = "CSV"
lazy protected val tableOptions: String = "OPTIONS (header 'false', delimiter '\t')"

override protected def sparkConf: SparkConf = {
val conf = super.sparkConf
.set("spark.sql.catalog.spark_catalog.type", "hadoop")
.set("spark.sql.catalog.spark_catalog.warehouse", s"spark-warehouse/${suiteName}")
.set(HOST_ENDPOINT.key, openSearchHost)
.set(HOST_PORT.key, openSearchPort.toString)
.set(REFRESH_POLICY.key, "true")
// Disable mandatory checkpoint for test convenience
.set(CHECKPOINT_MANDATORY.key, "false")

if (tableType.equalsIgnoreCase("iceberg")) {
conf
.set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
.set(
"spark.sql.extensions",
List(
classOf[IcebergSparkSessionExtensions].getName,
classOf[FlintSparkExtensions].getName)
.mkString(", "))
}
conf
}

Expand All @@ -82,11 +60,6 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit
FlintSparkIndexMonitor.executor = mockExecutor
}

override def afterAll(): Unit = {
deleteDirectory(s"spark-warehouse/${suiteName}")
super.afterAll()
}

protected def deleteTestIndex(testIndexNames: String*): Unit = {
testIndexNames.foreach(testIndex => {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark.iceberg

import org.opensearch.flint.spark.FlintSparkCoveringIndexSqlITSuite

class FlintSparkIcebergCoveringIndexITSuite
extends FlintSparkCoveringIndexSqlITSuite
with FlintSparkIcebergSuite {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark.iceberg

import org.opensearch.flint.spark.FlintSparkMaterializedViewSqlITSuite

class FlintSparkIcebergMaterializedViewITSuite
extends FlintSparkMaterializedViewSqlITSuite
with FlintSparkIcebergSuite {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark.iceberg

import org.opensearch.flint.spark.FlintSparkSkippingIndexSqlITSuite

class FlintSparkIcebergSkippingIndexITSuite
extends FlintSparkSkippingIndexSqlITSuite
with FlintSparkIcebergSuite {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark.iceberg

import org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
import org.opensearch.flint.spark.FlintSparkExtensions
import org.opensearch.flint.spark.FlintSparkSuite

import org.apache.spark.SparkConf

/**
* Flint Spark suite tailored for Iceberg.
*/
trait FlintSparkIcebergSuite extends FlintSparkSuite {

// Override table type to Iceberg for this suite
override lazy protected val tableType: String = "iceberg"

// You can also override tableOptions if Iceberg requires different options
override lazy protected val tableOptions: String = ""

// Override the sparkConf method to include Iceberg-specific configurations
override protected def sparkConf: SparkConf = {
val conf = super.sparkConf
// Set Iceberg-specific Spark configurations
.set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
.set("spark.sql.catalog.spark_catalog.type", "hadoop")
.set("spark.sql.catalog.spark_catalog.warehouse", s"spark-warehouse/${suiteName}")
.set(
"spark.sql.extensions",
List(
classOf[IcebergSparkSessionExtensions].getName,
classOf[FlintSparkExtensions].getName).mkString(", "))
conf
}

override def afterAll(): Unit = {
deleteDirectory(s"spark-warehouse/${suiteName}")
super.afterAll()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

package org.opensearch.flint.spark.ppl

import org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
import org.opensearch.flint.spark.{FlintPPLSparkExtensions, FlintSparkExtensions, FlintSparkSuite}

import org.apache.spark.SparkConf
Expand All @@ -20,10 +19,7 @@ trait FlintPPLSuite extends FlintSparkSuite {
val conf = super.sparkConf
.set(
"spark.sql.extensions",
List(
classOf[IcebergSparkSessionExtensions].getName,
classOf[FlintPPLSparkExtensions].getName,
classOf[FlintSparkExtensions].getName)
List(classOf[FlintPPLSparkExtensions].getName, classOf[FlintSparkExtensions].getName)
.mkString(", "))
.set(OPTIMIZER_RULE_ENABLED.key, "false")
conf
Expand Down

0 comments on commit 2f4abe5

Please sign in to comment.