Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding support to run integ tests on iceberg tables #301

Merged
merged 3 commits into from
Apr 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ import Dependencies._
lazy val scala212 = "2.12.14"
lazy val sparkVersion = "3.3.2"
lazy val opensearchVersion = "2.6.0"
lazy val icebergVersion = "1.5.0"

val scalaMinorVersion = scala212.split("\\.").take(2).mkString(".")
val sparkMinorVersion = sparkVersion.split("\\.").take(2).mkString(".")

ThisBuild / organization := "org.opensearch"

Expand Down Expand Up @@ -172,6 +176,8 @@ lazy val integtest = (project in file("integ-test"))
"org.scalatest" %% "scalatest" % "3.2.15" % "test",
"com.stephenn" %% "scalatest-json-jsonassert" % "0.2.5" % "test",
"org.testcontainers" % "testcontainers" % "1.18.0" % "test",
"org.apache.iceberg" %% s"iceberg-spark-runtime-$sparkMinorVersion" % icebergVersion % "test",
"org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0" % "test",
// add opensearch-java client to get node stats
"org.opensearch.client" % "opensearch-java" % "2.6.0" % "test"
exclude ("com.fasterxml.jackson.core", "jackson-databind")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class FlintJobITSuite extends FlintSparkSuite with JobTest {
super.beforeAll()
// initialized after the container is started
osClient = new OSClient(new FlintOptions(openSearchOptions.asJava))
createPartitionedMultiRowTable(testTable)
createPartitionedMultiRowAddressTable(testTable)
}

protected override def afterEach(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite {
override def beforeAll(): Unit = {
super.beforeAll()

createPartitionedTable(testTable)
createPartitionedAddressTable(testTable)
}

override def afterEach(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite {
override def beforeEach(): Unit = {
super.beforeEach()

createPartitionedTable(testTable)
createPartitionedAddressTable(testTable)
}

override def afterEach(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class FlintSparkIndexJobITSuite extends OpenSearchTransactionSuite with Matchers

override def beforeAll(): Unit = {
super.beforeAll()
createPartitionedTable(testTable)
createPartitionedAddressTable(testTable)
}

override def afterEach(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class FlintSparkIndexMonitorITSuite extends OpenSearchTransactionSuite with Matc

override def beforeAll(): Unit = {
super.beforeAll()
createPartitionedTable(testTable)
createPartitionedAddressTable(testTable)

// Replace mock executor with real one and change its delay
val realExecutor = newDaemonThreadPoolScheduledExecutor("flint-index-heartbeat", 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,20 @@ import org.apache.spark.sql.internal.SQLConf
class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {

/** Test table and index name */
private val testTable = "spark_catalog.default.test"
private val testTable = "spark_catalog.default.skipping_test"
private val testIndex = getSkippingIndexName(testTable)
private val testLatestId = Base64.getEncoder.encodeToString(testIndex.getBytes)

override def beforeEach(): Unit = {
super.beforeEach()
createPartitionedMultiRowTable(testTable)
createPartitionedMultiRowAddressTable(testTable)
}

override def afterEach(): Unit = {
super.afterEach()

// Delete all test indices
deleteTestIndex(testIndex)
sql(s"DROP TABLE $testTable")
super.afterEach()
}

test("create skipping index with metadata successfully") {
Expand All @@ -63,7 +62,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
index shouldBe defined
index.get.metadata().getContent should matchJson(s"""{
| "_meta": {
| "name": "flint_spark_catalog_default_test_skipping_index",
| "name": "flint_spark_catalog_default_skipping_test_skipping_index",
| "version": "${current()}",
| "kind": "skipping",
| "indexedColumns": [
Expand Down Expand Up @@ -101,7 +100,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
| "columnName": "name",
| "columnType": "string"
| }],
| "source": "spark_catalog.default.test",
| "source": "spark_catalog.default.skipping_test",
| "options": {
| "auto_refresh": "false",
| "incremental_refresh": "false"
Expand Down Expand Up @@ -480,7 +479,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
// Table name without database name "default"
val query = sql(s"""
| SELECT name
| FROM test
| FROM skipping_test
| WHERE year = 2023
|""".stripMargin)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite {
override def beforeEach(): Unit = {
super.beforeAll()

createPartitionedMultiRowTable(testTable)
createPartitionedMultiRowAddressTable(testTable)
}

protected override def afterEach(): Unit = {
Expand Down
Loading
Loading