Skip to content

Commit

Permalink
Remove integ test in favor of fast follow
Browse files Browse the repository at this point in the history
  • Loading branch information
engechas committed Nov 20, 2024
1 parent 264ab07 commit efeac8a
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -445,31 +445,6 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit
sql(s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 03:00:00', 'E', 15, 'Vancouver')")
}

protected def createIcebergTimeSeriesTable(testTable: String): Unit = {
sql(s"""
| CREATE TABLE $testTable
| (
| time TIMESTAMP,
| name STRING,
| age INT,
| address STRING,
| mymap MAP<STRING, STRING>
| )
| USING ICEBERG
|""".stripMargin)

sql(
s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 00:01:00', 'A', 30, 'Seattle', Map('mapkey1', 'mapvalue1'))")
sql(
s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 00:10:00', 'B', 20, 'Seattle', Map('mapkey2', 'mapvalue2'))")
sql(
s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 00:15:00', 'C', 35, 'Portland', Map('mapkey3', 'mapvalue3'))")
sql(
s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 01:00:00', 'D', 40, 'Portland', Map('mapkey4', 'mapvalue4'))")
sql(
s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 03:00:00', 'E', 15, 'Vancouver', Map('mapkey5', 'mapvalue5'))")
}

protected def createTimeSeriesTransactionTable(testTable: String): Unit = {
sql(s"""
| CREATE TABLE $testTable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,67 +6,7 @@
package org.opensearch.flint.spark.iceberg

import org.opensearch.flint.spark.FlintSparkMaterializedViewSqlITSuite
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.getFlintIndexName
import org.scalatest.matchers.must.Matchers.defined
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper

import org.apache.spark.sql.Row
import org.apache.spark.sql.flint.config.FlintSparkConf

class FlintSparkIcebergMaterializedViewITSuite
extends FlintSparkMaterializedViewSqlITSuite
with FlintSparkIcebergSuite {

private val icebergTestTable = s"$catalogName.default.mv_test_iceberg"
private val icebergTestMvName = s"$catalogName.default.mv_test_iceberg_metrics"
private val icebergTestFlintIndex = getFlintIndexName(icebergTestMvName)
private val icebergTestQuery =
s"""
| SELECT
| mymap
| FROM $icebergTestTable
|""".stripMargin

override def beforeEach(): Unit = {
super.beforeAll()
createIcebergTimeSeriesTable(icebergTestTable)
}

override def afterEach(): Unit = {
super.afterEach()
deleteTestIndex(icebergTestFlintIndex)
sql(s"DROP TABLE $icebergTestTable")
conf.unsetConf(FlintSparkConf.CUSTOM_FLINT_SCHEDULER_CLASS.key)
conf.unsetConf(FlintSparkConf.EXTERNAL_SCHEDULER_ENABLED.key)
}

test("create materialized view with map type") {
withTempDir { checkpointDir =>
sql(s"""
| CREATE MATERIALIZED VIEW $icebergTestMvName
| AS $icebergTestQuery
| WITH (
| auto_refresh = true,
| checkpoint_location = '${checkpointDir.getAbsolutePath}'
| )
|""".stripMargin)

// Wait for streaming job complete current micro batch
val job = spark.streams.active.find(_.name == icebergTestFlintIndex)
job shouldBe defined
failAfter(streamingTimeout) {
job.get.processAllAvailable()
}

flint.describeIndex(icebergTestFlintIndex) shouldBe defined
checkAnswer(
flint.queryIndex(icebergTestFlintIndex).select("mymap"),
Seq(
Row(Row("mapvalue2", null, null, null, null)),
Row(Row(null, "mapvalue5", null, null, null)),
Row(Row(null, null, "mapvalue3", null, null)),
Row(Row(null, null, null, "mapvalue4", null)),
Row(Row(null, null, null, null, "mapvalue1"))))
}
}
}
with FlintSparkIcebergSuite {}

0 comments on commit efeac8a

Please sign in to comment.