Skip to content

Commit

Permalink
Update IT and doc
Browse files Browse the repository at this point in the history
Signed-off-by: Louis Chu <[email protected]>
  • Loading branch information
noCharger committed Sep 20, 2024
1 parent 2b381ba commit 88a8463
Show file tree
Hide file tree
Showing 7 changed files with 25 additions and 10 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ Version compatibility:
| 0.2.0 | 11+ | 3.3.1 | 2.12.14 | 2.6+ |
| 0.3.0 | 11+ | 3.3.2 | 2.12.14 | 2.13+ |
| 0.4.0 | 11+ | 3.3.2 | 2.12.14 | 2.13+ |
| 0.5.0 | 11+ | 3.5.1 | 2.12.14 | 2.13+ |
| 0.6.0 | 11+ | 3.5.1 | 2.12.14 | 2.13+ |
| 0.5.0 | 11+ | 3.5.1 | 2.12.14 | 2.17+ |
| 0.6.0 | 11+ | 3.5.1 | 2.12.14 | 2.17+ |

## Flint Extension Usage

Expand Down
4 changes: 4 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,7 @@ WITH (

User can provide the following options in `WITH` clause of alter statement:
+ `auto_refresh`: This is required for alter statement. Currently, we restrict that an alter statement must change the auto refresh option from its original value.
+ `scheduler_mode`: A mode string (`internal` or `external`) that describes how `auto_refresh` is scheduled. `checkpoint_location` is required for the external scheduler.
+ `refresh_interval`
+ `incremental_refresh`
+ `checkpoint_location`
Expand Down Expand Up @@ -524,6 +525,9 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i
- `spark.datasource.flint.customAWSCredentialsProvider`: default is empty.
- `spark.datasource.flint.customFlintMetadataLogServiceClass`: default is empty.
- `spark.datasource.flint.customFlintIndexMetadataServiceClass`: default is empty.
- `spark.datasource.flint.customFlintSchedulerClass`: default is empty.
- `spark.flint.job.externalScheduler.enabled`: default is false. enable external scheduler for flint auto refresh to schedule refresh job outside of spark.
- `spark.flint.job.externalScheduler.interval`: default is 5 minutes. a string of refresh interval for external scheduler to trigger index refresh.
- `spark.datasource.flint.write.id_name`: no default value.
- `spark.datasource.flint.ignore.id_column` : default value is true.
- `spark.datasource.flint.write.batch_size`: "The number of documents written to Flint in a single batch request. Default value is Integer.MAX_VALUE.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
import java.time.Instant;
import java.time.temporal.ChronoUnit;

import static org.opensearch.flint.core.logging.CustomLogging.logInfo;

public class IntervalSchedulerParser {

public static Schedule parse(Object schedule) {
Expand All @@ -25,8 +23,6 @@ public static Schedule parse(Object schedule) {
return (Schedule) schedule;
}

logInfo(schedule.getClass().getSimpleName());

if (schedule instanceof scala.Option) {
scala.Option<?> option = (scala.Option<?>) schedule;
if (option.isDefined()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite {
indexData.count() shouldBe 2
}

ignore("create covering index with external scheduler") {
test("create covering index with external scheduler") {
withTempDir { checkpointDir =>
sql(s"""
| CREATE INDEX $testIndex ON $testTable
Expand All @@ -155,6 +155,11 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite {
| )
| """.stripMargin)

val indexData = flint.queryIndex(testFlintIndex)

flint.describeIndex(testFlintIndex) shouldBe defined
indexData.count() shouldBe 0

// Refresh all present source data as of now
sql(s"REFRESH INDEX $testIndex ON $testTable")
flint.queryIndex(testFlintIndex).count() shouldBe 2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class FlintSparkIndexValidationITSuite extends FlintSparkSuite with SparkHiveSup

Seq(createSkippingIndexStatement, createCoveringIndexStatement, createMaterializedViewStatement)
.foreach { statement =>
ignore(
test(
s"should fail to create auto refresh Flint index if scheduler_mode is external and no checkpoint location: $statement") {
withTable(testTable) {
sql(s"CREATE TABLE $testTable (name STRING) USING JSON")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite {
}
}

ignore("create materialized view with auto refresh and external scheduler") {
test("create materialized view with auto refresh and external scheduler") {
withTempDir { checkpointDir =>
sql(s"""
| CREATE MATERIALIZED VIEW $testMvName
Expand All @@ -95,6 +95,11 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite {
| )
| """.stripMargin)

val indexData = flint.queryIndex(testFlintIndex)

flint.describeIndex(testFlintIndex) shouldBe defined
indexData.count() shouldBe 0

// Refresh all present source data as of now
sql(s"REFRESH MATERIALIZED VIEW $testMvName")
flint.queryIndex(testFlintIndex).count() shouldBe 3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite with ExplainSuit
indexData.count() shouldBe 2
}

ignore("create skipping index with auto refresh and external scheduler") {
test("create skipping index with auto refresh and external scheduler") {
withTempDir { checkpointDir =>
sql(s"""
| CREATE SKIPPING INDEX ON $testTable
Expand All @@ -74,6 +74,11 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite with ExplainSuit
| )
| """.stripMargin)

val indexData = flint.queryIndex(testIndex)

flint.describeIndex(testIndex) shouldBe defined
indexData.count() shouldBe 0

// Refresh all present source data as of now
sql(s"REFRESH SKIPPING INDEX ON $testTable")
flint.queryIndex(testIndex).count() shouldBe 2
Expand Down

0 comments on commit 88a8463

Please sign in to comment.