Skip to content

Commit

Permalink
Add IT to verify Iceberg metadata changes
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Jun 14, 2024
1 parent 77f7973 commit e5c6e10
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import org.opensearch.flint.spark.source.{FlintSparkSourceRelation, FlintSparkSo

import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, V2WriteCommand}
import org.apache.spark.sql.catalyst.plans.logical.{Call, LogicalPlan, V2WriteCommand}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.flint.{qualifyTableName, FlintDataSourceV2}
Expand All @@ -33,7 +33,8 @@ class ApplyFlintSparkCoveringIndex(flint: FlintSpark) extends Rule[LogicalPlan]
private val supportedProviders = FlintSparkSourceRelationProvider.getAllProviders(flint.spark)

override def apply(plan: LogicalPlan): LogicalPlan = {
if (plan.isInstanceOf[V2WriteCommand]) { // TODO: bypass any non-select plan
if (plan.isInstanceOf[V2WriteCommand] || plan
.isInstanceOf[Call]) { // TODO: bypass any non-select plan
plan
} else {
// Iterate each sub plan tree in the given plan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ class AutoIndexRefresh(indexName: String, index: FlintSparkIndex)
.queryName(indexName)
.addSinkOptions(options, flintSparkConf)
.foreachBatch { (batchDF: DataFrame, _: Long) =>
// Debugging
logInfo(s"New micro batch size: ${batchDF.count()}")
logInfo(s"New micro batch data: ${batchDF.collect().mkString(",")}")

new FullIndexRefresh(indexName, index, Some(batchDF))
.start(spark, flintSparkConf)
() // discard return value above and return unit to use right overridden method
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,39 @@
package org.opensearch.flint.spark.iceberg

import org.opensearch.flint.spark.FlintSparkCoveringIndexSqlITSuite
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.getFlintIndexName
import org.scalatest.matchers.should.Matchers

class FlintSparkIcebergCoveringIndexITSuite
extends FlintSparkCoveringIndexSqlITSuite
with FlintSparkIcebergSuite {}
with FlintSparkIcebergSuite
with Matchers {

private val testIcebergTable = "spark_catalog.default.covering_sql_iceberg_test"

test("create covering index on Iceberg struct type") {
sql(s"""
| CREATE TABLE $testIcebergTable (
| status_code STRING,
| src_endpoint STRUCT<ip: STRING, port: INT>
| )
| USING iceberg
|""".stripMargin)
sql(s"INSERT INTO $testIcebergTable VALUES ('200', STRUCT('192.168.0.1', 80))")

val testFlintIndex = getFlintIndexName("all", testIcebergTable)
sql(s"""
| CREATE INDEX all ON $testIcebergTable (
| status_code, src_endpoint
| )
| WITH (
| auto_refresh = true
| )
|""".stripMargin)

val job = spark.streams.active.find(_.name == testFlintIndex)
awaitStreamingComplete(job.get.id.toString)

flint.queryIndex(testFlintIndex).count() shouldBe 1
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark.iceberg

import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.getFlintIndexName
import org.scalatest.matchers.should.Matchers

import org.apache.spark.sql.flint.config.FlintSparkConf.OPTIMIZER_RULE_COVERING_INDEX_ENABLED

class FlintSparkIcebergMetadataITSuite extends FlintSparkIcebergSuite with Matchers {

private val testIcebergTable = "spark_catalog.default.covering_sql_iceberg_test"
private val testFlintIndex = getFlintIndexName("all", testIcebergTable)

override def beforeAll(): Unit = {
super.beforeAll()
setFlintSparkConf(OPTIMIZER_RULE_COVERING_INDEX_ENABLED, "false")
}

override protected def beforeEach(): Unit = {
super.beforeEach()
sql(s"""
| CREATE TABLE $testIcebergTable (
| time_dt TIMESTAMP,
| status_code STRING,
| action STRING
| )
| USING iceberg
|""".stripMargin)

// v1
sql(
s"INSERT INTO $testIcebergTable VALUES (TIMESTAMP '2023-10-01 00:01:00', '200', 'Accept')")
// v2
sql(
s"INSERT INTO $testIcebergTable VALUES (TIMESTAMP '2023-10-02 00:01:00', '200', 'Accept')")
// v3
sql(
s"INSERT INTO $testIcebergTable VALUES (TIMESTAMP '2023-10-03 00:01:00', '400', 'Reject')")

sql(s"""
| CREATE INDEX all ON $testIcebergTable (
| time_dt, status_code, action
| )
| WITH (
| auto_refresh = true
| )
|""".stripMargin)
val job = spark.streams.active.find(_.name == testFlintIndex)
awaitStreamingComplete(job.get.id.toString)
}

test("data expiration") {
flint.queryIndex(testFlintIndex).count() shouldBe 3

sql(s"""
| CALL spark_catalog.system.expire_snapshots (
| table => 'covering_sql_iceberg_test',
| older_than => 1718222788758
| )
| """.stripMargin)

val job = spark.streams.active.find(_.name == testFlintIndex)
awaitStreamingComplete(job.get.id.toString)
flint.queryIndex(testFlintIndex).count() shouldBe 3
}

test("data compaction") {
flint.queryIndex(testFlintIndex).count() shouldBe 3

sql(s"""
| CALL spark_catalog.system.rewrite_data_files (
| table => 'covering_sql_iceberg_test',
| options => map('rewrite-all', true)
| )
| """.stripMargin)

// A new empty micro batch is generated
val job = spark.streams.active.find(_.name == testFlintIndex)
awaitStreamingComplete(job.get.id.toString)
flint.queryIndex(testFlintIndex).count() shouldBe 3
}

test("schema evolution") {
flint.queryIndex(testFlintIndex).count() shouldBe 3

sql(s"""
| ALTER TABLE $testIcebergTable
| ADD COLUMN severity_id INT
| """.stripMargin)

// No new micro batch after schema changed (no new snapshot)
val job = spark.streams.active.find(_.name == testFlintIndex)
awaitStreamingComplete(job.get.id.toString)
flint.queryIndex(testFlintIndex).count() shouldBe 3

// v4 with new column
sql(
s"INSERT INTO $testIcebergTable VALUES (TIMESTAMP '2023-10-04 00:01:00', '304', 'Accept', 3)")
awaitStreamingComplete(job.get.id.toString)
flint.queryIndex(testFlintIndex).count() shouldBe 4
flint.queryIndex(testFlintIndex).schema.fields.length shouldBe 3
}
}

0 comments on commit e5c6e10

Please sign in to comment.