Skip to content

Commit

Permalink
Add capability to test against iceberg tables.
Browse files Browse the repository at this point in the history
Signed-off-by: Adi Suresh <[email protected]>
  • Loading branch information
asuresh8 committed Apr 2, 2024
1 parent e7d57f9 commit ad543b4
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 28 deletions.
6 changes: 6 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ import Dependencies._
lazy val scala212 = "2.12.14"
lazy val sparkVersion = "3.3.2"
lazy val opensearchVersion = "2.6.0"
lazy val icebergVersion = "1.5.0"

val scalaMinorVersion = scala212.split("\\.").take(2).mkString(".")
val sparkMinorVersion = sparkVersion.split("\\.").take(2).mkString(".")

ThisBuild / organization := "org.opensearch"

Expand Down Expand Up @@ -172,6 +176,8 @@ lazy val integtest = (project in file("integ-test"))
"org.scalatest" %% "scalatest" % "3.2.15" % "test",
"com.stephenn" %% "scalatest-json-jsonassert" % "0.2.5" % "test",
"org.testcontainers" % "testcontainers" % "1.18.0" % "test",
"org.apache.iceberg" %% s"iceberg-spark-runtime-$sparkMinorVersion" % icebergVersion % "test",
"org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0" % "test",
// add opensearch-java client to get node stats
"org.opensearch.client" % "opensearch-java" % "2.6.0" % "test"
exclude ("com.fasterxml.jackson.core", "jackson-databind")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.sql.internal.SQLConf
class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {

/** Test table and index name */
private val testTable = "spark_catalog.default.test"
private val testTable = "spark_catalog.default.skipping_test"
private val testIndex = getSkippingIndexName(testTable)
private val testLatestId = Base64.getEncoder.encodeToString(testIndex.getBytes)

Expand All @@ -42,11 +42,10 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
}

override def afterEach(): Unit = {
super.afterEach()

// Delete all test indices
deleteTestIndex(testIndex)
sql(s"DROP TABLE $testTable")
super.afterEach()
}

test("create skipping index with metadata successfully") {
Expand All @@ -63,7 +62,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
index shouldBe defined
index.get.metadata().getContent should matchJson(s"""{
| "_meta": {
| "name": "flint_spark_catalog_default_test_skipping_index",
| "name": "flint_spark_catalog_default_skipping_test_skipping_index",
| "version": "${current()}",
| "kind": "skipping",
| "indexedColumns": [
Expand Down Expand Up @@ -101,7 +100,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
| "columnName": "name",
| "columnType": "string"
| }],
| "source": "spark_catalog.default.test",
| "source": "spark_catalog.default.skipping_test",
| "options": {
| "auto_refresh": "false",
| "incremental_refresh": "false"
Expand Down Expand Up @@ -322,6 +321,9 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
}

test("should not rewrite original query if no skipping index") {
assume(System.getProperty("TABLE_TYPE") != "iceberg",
"""Test disabled for iceberg because Iceberg tables have a built-in
skipping index which rewrites queries""")
val query =
s"""
| SELECT name
Expand All @@ -337,6 +339,9 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
}

test("should not rewrite original query if skipping index is logically deleted") {
assume(System.getProperty("TABLE_TYPE") != "iceberg",
"""Test disabled for Iceberg because Iceberg tables have a built-in
skipping index which rewrites queries""")
flint
.skippingIndex()
.onTable(testTable)
Expand All @@ -359,6 +364,9 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
}

test("can build partition skipping index and rewrite applicable query") {
assume(System.getProperty("TABLE_TYPE") != "iceberg",
"""Test disabled for Iceberg because Iceberg tables have a built-in
skipping index which rewrites queries""")
flint
.skippingIndex()
.onTable(testTable)
Expand All @@ -384,6 +392,9 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
}

test("can build value set skipping index and rewrite applicable query") {
assume(System.getProperty("TABLE_TYPE") != "iceberg",
"""Test disabled for Iceberg because Iceberg tables have a built-in
skipping index which rewrites queries""")
flint
.skippingIndex()
.onTable(testTable)
Expand Down Expand Up @@ -413,6 +424,9 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
}

test("can build min max skipping index and rewrite applicable query") {
assume(System.getProperty("TABLE_TYPE") != "iceberg",
"""Test disabled for Iceberg because Iceberg tables have a built-in
skipping index which rewrites queries""")
flint
.skippingIndex()
.onTable(testTable)
Expand All @@ -439,6 +453,9 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
}

test("can build bloom filter skipping index and rewrite applicable query") {
assume(System.getProperty("TABLE_TYPE") != "iceberg",
"""Test disabled for Iceberg because Iceberg tables have a built-in
skipping index which rewrites queries""")
flint
.skippingIndex()
.onTable(testTable)
Expand Down Expand Up @@ -470,6 +487,9 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
}

test("should rewrite applicable query with table name without database specified") {
assume(System.getProperty("TABLE_TYPE") != "iceberg",
"""Test disabled for Iceberg because Iceberg tables have a built-in
skipping index which rewrites queries""")
flint
.skippingIndex()
.onTable(testTable)
Expand All @@ -479,7 +499,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
// Table name without database name "default"
val query = sql(s"""
| SELECT name
| FROM test
| FROM skipping_test
| WHERE year = 2023
|""".stripMargin)

Expand All @@ -488,6 +508,9 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
}

test("should not rewrite original query if filtering condition has disjunction") {
assume(System.getProperty("TABLE_TYPE") != "iceberg",
"""Test disabled for Iceberg because Iceberg tables have a built-in
skipping index which rewrites queries""")
flint
.skippingIndex()
.onTable(testTable)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,15 @@

package org.opensearch.flint.spark

import java.nio.file.{Files, Path, Paths, StandardCopyOption}
import java.util.Comparator
import java.util.concurrent.{ScheduledExecutorService, ScheduledFuture}

import scala.collection.immutable.Map
import scala.concurrent.duration.TimeUnit
import scala.util.Try

import org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.when
import org.mockito.invocation.InvocationOnMock
Expand All @@ -21,6 +25,7 @@ import org.scalatest.prop.TableDrivenPropertyChecks.forAll
import org.scalatestplus.mockito.MockitoSugar.mock

import org.apache.spark.FlintSuite
import org.apache.spark.SparkConf
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.flint.config.FlintSparkConf.{CHECKPOINT_MANDATORY, HOST_ENDPOINT, HOST_PORT, REFRESH_POLICY}
import org.apache.spark.sql.streaming.StreamTest
Expand All @@ -44,16 +49,32 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit
lazy protected val tableType: String = Option(System.getProperty("TABLE_TYPE")).getOrElse("CSV")
lazy protected val tableOptions: String = TableOptions.opts.getOrElse(tableType.toLowerCase(), "")

override protected def sparkConf: SparkConf = {
val conf = super.sparkConf
.set("spark.sql.catalog.spark_catalog.type", "hadoop")
.set("spark.sql.catalog.spark_catalog.warehouse", s"spark-warehouse/${suiteName}")
.set(HOST_ENDPOINT.key, openSearchHost)
.set(HOST_PORT.key, openSearchPort.toString)
.set(REFRESH_POLICY.key, "true")
// Disable mandatory checkpoint for test convenience
.set(CHECKPOINT_MANDATORY.key, "false")

if (tableType.equalsIgnoreCase("iceberg")) {
conf
.set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
.set(
"spark.sql.extensions",
List(
classOf[IcebergSparkSessionExtensions].getName,
classOf[FlintSparkExtensions].getName)
.mkString(", "))
}
conf
}

override def beforeAll(): Unit = {
super.beforeAll()

setFlintSparkConf(HOST_ENDPOINT, openSearchHost)
setFlintSparkConf(HOST_PORT, openSearchPort)
setFlintSparkConf(REFRESH_POLICY, "true")

// Disable mandatory checkpoint for test convenience
setFlintSparkConf(CHECKPOINT_MANDATORY, "false")

// Replace executor to avoid impact on IT.
// TODO: Currently no IT test scheduler so no need to restore it back.
val mockExecutor = mock[ScheduledExecutorService]
Expand All @@ -62,6 +83,11 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit
FlintSparkIndexMonitor.executor = mockExecutor
}

override def afterAll(): Unit = {
deleteDirectory(s"spark-warehouse/${suiteName}")
super.afterAll()
}

protected def deleteTestIndex(testIndexNames: String*): Unit = {
testIndexNames.foreach(testIndex => {

Expand All @@ -86,6 +112,15 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit
})
}

def deleteDirectory(dirPath: String): Try[Unit] = {
Try {
val directory = Paths.get(dirPath)
Files.walk(directory)
.sorted(Comparator.reverseOrder())
.forEach(Files.delete(_))
}
}

protected def awaitStreamingComplete(jobId: String): Unit = {
val job = spark.streams.get(jobId)
failAfter(streamingTimeout) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.flint.spark.ppl

import org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
import org.opensearch.flint.spark.{FlintPPLSparkExtensions, FlintSparkExtensions, FlintSparkSuite}

import org.apache.spark.SparkConf
Expand All @@ -16,18 +17,13 @@ import org.apache.spark.sql.test.SharedSparkSession

trait FlintPPLSuite extends FlintSparkSuite {
override protected def sparkConf: SparkConf = {
val conf = new SparkConf()
.set("spark.ui.enabled", "false")
.set(SQLConf.CODEGEN_FALLBACK.key, "false")
.set(SQLConf.CODEGEN_FACTORY_MODE.key, CodegenObjectFactoryMode.CODEGEN_ONLY.toString)
// Disable ConvertToLocalRelation for better test coverage. Test cases built on
// LocalRelation will exercise the optimization rules better by disabling it as
// this rule may potentially block testing of other optimization rules such as
// ConstantPropagation etc.
.set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName)
val conf = super.sparkConf
.set(
"spark.sql.extensions",
List(classOf[FlintPPLSparkExtensions].getName, classOf[FlintSparkExtensions].getName)
List(
classOf[IcebergSparkSessionExtensions].getName,
classOf[FlintPPLSparkExtensions].getName,
classOf[FlintSparkExtensions].getName)
.mkString(", "))
.set(OPTIMIZER_RULE_ENABLED.key, "false")
conf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,15 @@ class FlintSparkPPLCorrelationITSuite
Row(70000.0, "Canada", 50L),
Row(95000.0, "USA", 40L))

implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Long](_.getAs[Long](2))
// Define ordering for rows that first compares by age then by name
implicit val rowOrdering: Ordering[Row] = new Ordering[Row] {
def compare(x: Row, y: Row): Int = {
val ageCompare = x.getAs[Long](2).compareTo(y.getAs[Long](2))
if (ageCompare != 0) ageCompare
else x.getAs[String](1).compareTo(y.getAs[String](1))
}
}

// Compare the results
assert(results.sorted.sameElements(expectedResults.sorted))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,12 +232,16 @@ class FlintSparkPPLTimeWindowITSuite
"prod3",
Timestamp.valueOf("2023-05-03 17:00:00"),
Timestamp.valueOf("2023-05-04 17:00:00")))
// Compare the results
implicit val timestampOrdering: Ordering[Timestamp] = new Ordering[Timestamp] {
def compare(x: Timestamp, y: Timestamp): Int = x.compareTo(y)

// Define ordering for rows that first compares by the timestamp and then by the productId
implicit val rowOrdering: Ordering[Row] = new Ordering[Row] {
def compare(x: Row, y: Row): Int = {
val dateCompare = x.getAs[Timestamp](2).compareTo(y.getAs[Timestamp](2))
if (dateCompare != 0) dateCompare
else x.getAs[String](1).compareTo(y.getAs[String](1))
}
}

implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Timestamp](_.getAs[Timestamp](2))
assert(results.sorted.sameElements(expectedResults.sorted))

// Retrieve the logical plan
Expand Down

0 comments on commit ad543b4

Please sign in to comment.