Skip to content

Commit

Permalink
Add more SQL IT
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Jan 24, 2024
1 parent 292b600 commit 9cb2d75
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite {
}
}

test("create covering index with manual refresh") {
test("create covering index with full refresh") {
sql(s"""
| CREATE INDEX $testIndex ON $testTable
| (name, age)
Expand All @@ -151,6 +151,35 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite {
indexData.count() shouldBe 2
}

test("create covering index with incremental refresh") {
withTempDir { checkpointDir =>
sql(s"""
| CREATE INDEX $testIndex ON $testTable
| (name, age)
| WITH (
| incremental_refresh = true,
| checkpoint_location = '${checkpointDir.getAbsolutePath}'
| )
| """.stripMargin)

// Refresh all present source data as of now
sql(s"REFRESH INDEX $testIndex ON $testTable")
flint.queryIndex(testFlintIndex).count() shouldBe 2

// New data won't be refreshed until refresh statement triggered
sql(s"""
| INSERT INTO $testTable
| PARTITION (year=2023, month=5)
| VALUES ('Hello', 50, 'Vancouver')
|""".stripMargin)
flint.queryIndex(testFlintIndex).count() shouldBe 2

// New data is refreshed incrementally
sql(s"REFRESH INDEX $testIndex ON $testTable")
flint.queryIndex(testFlintIndex).count() shouldBe 3
}
}

test("create covering index on table without database name") {
sql(s"CREATE INDEX $testIndex ON covering_sql_test (name)")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite {
(settings \ "index.number_of_replicas").extract[String] shouldBe "2"
}

test("create materialized view with manual refresh") {
test("create materialized view with full refresh") {
sql(s"""
| CREATE MATERIALIZED VIEW $testMvName
| AS $testQuery
Expand All @@ -146,6 +146,35 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite {
indexData.count() shouldBe 4
}

test("create materialized view with incremental refresh") {
withTempDir { checkpointDir =>
sql(s"""
| CREATE MATERIALIZED VIEW $testMvName
| AS $testQuery
| WITH (
| incremental_refresh = true,
| checkpoint_location = '${checkpointDir.getAbsolutePath}',
| watermark_delay = '1 Second'
| )
| """.stripMargin)

// Refresh all present source data as of now
sql(s"REFRESH MATERIALIZED VIEW $testMvName")
flint.queryIndex(testFlintIndex).count() shouldBe 3

// New data won't be refreshed until refresh statement triggered
sql(s"""
| INSERT INTO $testTable VALUES
| (TIMESTAMP '2023-10-01 04:00:00', 'F', 25, 'Vancouver')
| """.stripMargin)
flint.queryIndex(testFlintIndex).count() shouldBe 3

// New data is refreshed incrementally
sql(s"REFRESH MATERIALIZED VIEW $testMvName")
flint.queryIndex(testFlintIndex).count() shouldBe 4
}
}

test("create materialized view if not exists") {
sql(s"CREATE MATERIALIZED VIEW IF NOT EXISTS $testMvName AS $testQuery")
flint.describeIndex(testFlintIndex) shouldBe defined
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite {
private val testTable = "spark_catalog.default.skipping_sql_test"
private val testIndex = getSkippingIndexName(testTable)

override def beforeAll(): Unit = {
override def beforeEach(): Unit = {
super.beforeAll()

createPartitionedMultiRowTable(testTable)
Expand All @@ -37,6 +37,7 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite {
super.afterEach()

deleteTestIndex(testIndex)
sql(s"DROP TABLE $testTable")
}

test("create skipping index with auto refresh") {
Expand Down Expand Up @@ -142,7 +143,7 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite {
}
}

test("create skipping index with manual refresh") {
test("create skipping index with full refresh") {
sql(s"""
| CREATE SKIPPING INDEX ON $testTable
| (
Expand All @@ -161,6 +162,34 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite {
indexData.count() shouldBe 2
}

test("create skipping index with incremental refresh") {
withTempDir { checkpointDir =>
sql(s"""
| CREATE SKIPPING INDEX ON $testTable
| ( year PARTITION )
| WITH (
| incremental_refresh = true,
| checkpoint_location = '${checkpointDir.getAbsolutePath}'
| )
| """.stripMargin)

// Refresh all present source data as of now
sql(s"REFRESH SKIPPING INDEX ON $testTable")
flint.queryIndex(testIndex).count() shouldBe 2

// New data won't be refreshed until refresh statement triggered
sql(s"""
| INSERT INTO $testTable
| PARTITION (year=2023, month=5)
| VALUES ('Hello', 50, 'Vancouver')
|""".stripMargin)
flint.queryIndex(testIndex).count() shouldBe 2

sql(s"REFRESH SKIPPING INDEX ON $testTable")
flint.queryIndex(testIndex).count() shouldBe 3
}
}

test("should fail if refresh an auto refresh skipping index") {
sql(s"""
| CREATE SKIPPING INDEX ON $testTable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match
(parse(mapping) \ "_meta" \ "latestId").extract[String] shouldBe testLatestId
}

test("manual refresh index") {
test("full refresh index") {
flint
.skippingIndex()
.onTable(testTable)
Expand All @@ -78,6 +78,20 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match
}

test("incremental refresh index") {
flint
.skippingIndex()
.onTable(testTable)
.addPartitions("year", "month")
.options(FlintSparkIndexOptions(Map("incremental_refresh" -> "true")))
.create()
flint.refreshIndex(testFlintIndex)

val latest = latestLogEntry(testLatestId)
latest should contain("state" -> "active")
latest("jobStartTime").asInstanceOf[Number].longValue() should be > 0L
}

test("auto refresh index") {
flint
.skippingIndex()
.onTable(testTable)
Expand Down

0 comments on commit 9cb2d75

Please sign in to comment.