diff --git a/build.sbt b/build.sbt index 95324fc99..bcf21e444 100644 --- a/build.sbt +++ b/build.sbt @@ -7,6 +7,10 @@ import Dependencies._ lazy val scala212 = "2.12.14" lazy val sparkVersion = "3.3.2" lazy val opensearchVersion = "2.6.0" +lazy val icebergVersion = "1.5.0" + +val scalaMinorVersion = scala212.split("\\.").take(2).mkString(".") +val sparkMinorVersion = sparkVersion.split("\\.").take(2).mkString(".") ThisBuild / organization := "org.opensearch" @@ -172,6 +176,8 @@ lazy val integtest = (project in file("integ-test")) "org.scalatest" %% "scalatest" % "3.2.15" % "test", "com.stephenn" %% "scalatest-json-jsonassert" % "0.2.5" % "test", "org.testcontainers" % "testcontainers" % "1.18.0" % "test", + "org.apache.iceberg" %% s"iceberg-spark-runtime-$sparkMinorVersion" % icebergVersion % "test", + "org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0" % "test", // add opensearch-java client to get node stats "org.opensearch.client" % "opensearch-java" % "2.6.0" % "test" exclude ("com.fasterxml.jackson.core", "jackson-databind")), diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala index 01b941187..7ac0a921c 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.internal.SQLConf class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { /** Test table and index name */ - private val testTable = "spark_catalog.default.test" + private val testTable = "spark_catalog.default.skipping_test" private val testIndex = getSkippingIndexName(testTable) private val testLatestId = Base64.getEncoder.encodeToString(testIndex.getBytes) @@ -42,11 +42,10 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { } override def afterEach(): Unit = { - super.afterEach() - // Delete all test indices deleteTestIndex(testIndex) sql(s"DROP TABLE $testTable") + super.afterEach() } test("create skipping index with metadata successfully") { @@ -63,7 +62,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { index shouldBe defined index.get.metadata().getContent should matchJson(s"""{ | "_meta": { - | "name": "flint_spark_catalog_default_test_skipping_index", + | "name": "flint_spark_catalog_default_skipping_test_skipping_index", | "version": "${current()}", | "kind": "skipping", | "indexedColumns": [ @@ -101,7 +100,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { | "columnName": "name", | "columnType": "string" | }], - | "source": "spark_catalog.default.test", + | "source": "spark_catalog.default.skipping_test", | "options": { | "auto_refresh": "false", | "incremental_refresh": "false" @@ -322,6 +321,10 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { } test("should not rewrite original query if no skipping index") { + assume( + System.getProperty("TABLE_TYPE") != "iceberg", + """Test disabled for iceberg because Iceberg tables have a built-in + skipping index which rewrites queries""") val query = s""" | SELECT name @@ -337,6 +340,10 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { } test("should not rewrite original query if skipping index is logically deleted") { + assume( + System.getProperty("TABLE_TYPE") != "iceberg", + """Test disabled for Iceberg because Iceberg tables have a built-in + skipping index which rewrites queries""") flint .skippingIndex() .onTable(testTable) @@ -359,6 +366,10 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { } test("can build partition skipping index and rewrite applicable query") { + assume( + System.getProperty("TABLE_TYPE") != "iceberg", + """Test disabled for Iceberg because Iceberg tables have a built-in + skipping index which rewrites queries""") flint .skippingIndex() .onTable(testTable) @@ -384,6 +395,10 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { } test("can build value set skipping index and rewrite applicable query") { + assume( + System.getProperty("TABLE_TYPE") != "iceberg", + """Test disabled for Iceberg because Iceberg tables have a built-in + skipping index which rewrites queries""") flint .skippingIndex() .onTable(testTable) @@ -413,6 +428,10 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { } test("can build min max skipping index and rewrite applicable query") { + assume( + System.getProperty("TABLE_TYPE") != "iceberg", + """Test disabled for Iceberg because Iceberg tables have a built-in + skipping index which rewrites queries""") flint .skippingIndex() .onTable(testTable) @@ -439,6 +458,10 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { } test("can build bloom filter skipping index and rewrite applicable query") { + assume( + System.getProperty("TABLE_TYPE") != "iceberg", + """Test disabled for Iceberg because Iceberg tables have a built-in + skipping index which rewrites queries""") flint .skippingIndex() .onTable(testTable) @@ -470,6 +493,10 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { } test("should rewrite applicable query with table name without database specified") { + assume( + System.getProperty("TABLE_TYPE") != "iceberg", + """Test disabled for Iceberg because Iceberg tables have a built-in + skipping index which rewrites queries""") flint .skippingIndex() .onTable(testTable) @@ -479,7 +506,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { // Table name without database name "default" val query = sql(s""" | SELECT name - | FROM test + | FROM skipping_test | WHERE year = 2023 |""".stripMargin) @@ -488,6 +515,10 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { } test("should not rewrite original query if filtering condition has disjunction") { + assume( + System.getProperty("TABLE_TYPE") != "iceberg", + """Test disabled for Iceberg because Iceberg tables have a built-in + skipping index which rewrites queries""") flint .skippingIndex() .onTable(testTable) diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala index 9116fe40b..3e0fdee9f 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala @@ -5,11 +5,15 @@ package org.opensearch.flint.spark +import java.nio.file.{Files, Path, Paths, StandardCopyOption} +import java.util.Comparator import java.util.concurrent.{ScheduledExecutorService, ScheduledFuture} import scala.collection.immutable.Map import scala.concurrent.duration.TimeUnit +import scala.util.Try +import org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions import org.mockito.ArgumentMatchers.any import org.mockito.Mockito.when import org.mockito.invocation.InvocationOnMock @@ -21,6 +25,7 @@ import org.scalatest.prop.TableDrivenPropertyChecks.forAll import org.scalatestplus.mockito.MockitoSugar.mock import org.apache.spark.FlintSuite +import org.apache.spark.SparkConf import org.apache.spark.sql.QueryTest import org.apache.spark.sql.flint.config.FlintSparkConf.{CHECKPOINT_MANDATORY, HOST_ENDPOINT, HOST_PORT, REFRESH_POLICY} import org.apache.spark.sql.streaming.StreamTest @@ -28,10 +33,8 @@ import org.apache.spark.sql.streaming.StreamTest // Companion object for the MyTestSuite class object TableOptions { // Define the map here - val opts: Map[String, String] = Map( - "csv" -> "OPTIONS (header 'false', delimiter '\t')", - "iceberg" -> "" - ) + val opts: Map[String, String] = + Map("csv" -> "OPTIONS (header 'false', delimiter '\t')", "iceberg" -> "") } /** @@ -42,18 +45,35 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit /** Flint Spark high level API being tested */ lazy protected val flint: FlintSpark = new FlintSpark(spark) lazy protected val tableType: String = Option(System.getProperty("TABLE_TYPE")).getOrElse("CSV") - lazy protected val tableOptions: String = TableOptions.opts.getOrElse(tableType.toLowerCase(), "") + lazy protected val tableOptions: String = + TableOptions.opts.getOrElse(tableType.toLowerCase(), "") + + override protected def sparkConf: SparkConf = { + val conf = super.sparkConf + .set("spark.sql.catalog.spark_catalog.type", "hadoop") + .set("spark.sql.catalog.spark_catalog.warehouse", s"spark-warehouse/${suiteName}") + .set(HOST_ENDPOINT.key, openSearchHost) + .set(HOST_PORT.key, openSearchPort.toString) + .set(REFRESH_POLICY.key, "true") + // Disable mandatory checkpoint for test convenience + .set(CHECKPOINT_MANDATORY.key, "false") + + if (tableType.equalsIgnoreCase("iceberg")) { + conf + .set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") + .set( + "spark.sql.extensions", + List( + classOf[IcebergSparkSessionExtensions].getName, + classOf[FlintSparkExtensions].getName) + .mkString(", ")) + } + conf + } override def beforeAll(): Unit = { super.beforeAll() - setFlintSparkConf(HOST_ENDPOINT, openSearchHost) - setFlintSparkConf(HOST_PORT, openSearchPort) - setFlintSparkConf(REFRESH_POLICY, "true") - - // Disable mandatory checkpoint for test convenience - setFlintSparkConf(CHECKPOINT_MANDATORY, "false") - // Replace executor to avoid impact on IT. // TODO: Currently no IT test scheduler so no need to restore it back. val mockExecutor = mock[ScheduledExecutorService] @@ -62,6 +82,11 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit FlintSparkIndexMonitor.executor = mockExecutor } + override def afterAll(): Unit = { + deleteDirectory(s"spark-warehouse/${suiteName}") + super.afterAll() + } + protected def deleteTestIndex(testIndexNames: String*): Unit = { testIndexNames.foreach(testIndex => { @@ -86,6 +111,16 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit }) } + def deleteDirectory(dirPath: String): Try[Unit] = { + Try { + val directory = Paths.get(dirPath) + Files + .walk(directory) + .sorted(Comparator.reverseOrder()) + .forEach(Files.delete(_)) + } + } + protected def awaitStreamingComplete(jobId: String): Unit = { val job = spark.streams.get(jobId) failAfter(streamingTimeout) { diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintPPLSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintPPLSuite.scala index 667d3cfa6..7bae3173d 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintPPLSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintPPLSuite.scala @@ -5,6 +5,7 @@ package org.opensearch.flint.spark.ppl +import org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions import org.opensearch.flint.spark.{FlintPPLSparkExtensions, FlintSparkExtensions, FlintSparkSuite} import org.apache.spark.SparkConf @@ -16,18 +17,13 @@ import org.apache.spark.sql.test.SharedSparkSession trait FlintPPLSuite extends FlintSparkSuite { override protected def sparkConf: SparkConf = { - val conf = new SparkConf() - .set("spark.ui.enabled", "false") - .set(SQLConf.CODEGEN_FALLBACK.key, "false") - .set(SQLConf.CODEGEN_FACTORY_MODE.key, CodegenObjectFactoryMode.CODEGEN_ONLY.toString) - // Disable ConvertToLocalRelation for better test coverage. Test cases built on - // LocalRelation will exercise the optimization rules better by disabling it as - // this rule may potentially block testing of other optimization rules such as - // ConstantPropagation etc. - .set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName) + val conf = super.sparkConf .set( "spark.sql.extensions", - List(classOf[FlintPPLSparkExtensions].getName, classOf[FlintSparkExtensions].getName) + List( + classOf[IcebergSparkSessionExtensions].getName, + classOf[FlintPPLSparkExtensions].getName, + classOf[FlintSparkExtensions].getName) .mkString(", ")) .set(OPTIMIZER_RULE_ENABLED.key, "false") conf diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLCorrelationITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLCorrelationITSuite.scala index b6f2e8e0b..38fdcdbb9 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLCorrelationITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLCorrelationITSuite.scala @@ -620,7 +620,15 @@ class FlintSparkPPLCorrelationITSuite Row(70000.0, "Canada", 50L), Row(95000.0, "USA", 40L)) - implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Long](_.getAs[Long](2)) + // Define ordering for rows that first compares by age then by name + implicit val rowOrdering: Ordering[Row] = new Ordering[Row] { + def compare(x: Row, y: Row): Int = { + val ageCompare = x.getAs[Long](2).compareTo(y.getAs[Long](2)) + if (ageCompare != 0) ageCompare + else x.getAs[String](1).compareTo(y.getAs[String](1)) + } + } + // Compare the results assert(results.sorted.sameElements(expectedResults.sorted)) diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLTimeWindowITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLTimeWindowITSuite.scala index 25f5b4be7..fbae03fff 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLTimeWindowITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLTimeWindowITSuite.scala @@ -232,12 +232,16 @@ class FlintSparkPPLTimeWindowITSuite "prod3", Timestamp.valueOf("2023-05-03 17:00:00"), Timestamp.valueOf("2023-05-04 17:00:00"))) - // Compare the results - implicit val timestampOrdering: Ordering[Timestamp] = new Ordering[Timestamp] { - def compare(x: Timestamp, y: Timestamp): Int = x.compareTo(y) + + // Define ordering for rows that first compares by the timestamp and then by the productId + implicit val rowOrdering: Ordering[Row] = new Ordering[Row] { + def compare(x: Row, y: Row): Int = { + val dateCompare = x.getAs[Timestamp](2).compareTo(y.getAs[Timestamp](2)) + if (dateCompare != 0) dateCompare + else x.getAs[String](1).compareTo(y.getAs[String](1)) + } } - implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Timestamp](_.getAs[Timestamp](2)) assert(results.sorted.sameElements(expectedResults.sorted)) // Retrieve the logical plan