Skip to content

Commit

Permalink
Fix broken IT and update user manual
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Jan 18, 2024
1 parent 59779b2 commit 9cabe09
Show file tree
Hide file tree
Showing 4 changed files with 2 additions and 29 deletions.
4 changes: 0 additions & 4 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -515,10 +515,6 @@ CREATE INDEX Idx_elb ON alb_logs ...

For now, only single or conjunct conditions (conditions connected by AND) in WHERE clause can be optimized by skipping index.

### Index Refresh Job Management

Manual refreshing a table which already has skipping index being auto-refreshed, will be prevented. However, this assumption relies on the condition that the incremental refresh job is actively running in the same Spark cluster, which can be identified when performing the check.

## Integration

### AWS EMR Spark Integration - Using execution role
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,9 +320,6 @@ class FlintSpark(val spark: SparkSession) extends Logging {
spark.read.format(FLINT_DATASOURCE).load(indexName)
}

private def isIncrementalRefreshing(indexName: String): Boolean =
spark.streams.active.exists(_.name == indexName)

// TODO: move to separate class
private def doRefreshIndex(
index: FlintSparkIndex,
Expand All @@ -344,10 +341,6 @@ class FlintSpark(val spark: SparkSession) extends Logging {
}

val jobId = mode match {
case MANUAL if isIncrementalRefreshing(indexName) =>
throw new IllegalStateException(
s"Index $indexName is incremental refreshing and cannot be manual refreshed")

case MANUAL =>
logInfo("Start refreshing index in batch style")
batchRefresh()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite {
.name(testIndex)
.onTable(testTable)
.addIndexColumns("name", "age")
.options(FlintSparkIndexOptions(Map("auto_refresh" -> "true")))
.create()

val jobId = flint.refreshIndex(testFlintIndex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
.skippingIndex()
.onTable(testTable)
.addPartitions("year", "month")
.options(FlintSparkIndexOptions(Map("auto_refresh" -> "true")))
.create()

val jobId = flint.refreshIndex(testIndex)
Expand All @@ -187,24 +188,6 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
indexData should have size 2
}

test("should fail to manual refresh an incremental refreshing index") {
flint
.skippingIndex()
.onTable(testTable)
.addPartitions("year", "month")
.create()

val jobId = flint.refreshIndex(testIndex)
val job = spark.streams.get(jobId.get)
failAfter(streamingTimeout) {
job.processAllAvailable()
}

assertThrows[IllegalStateException] {
flint.refreshIndex(testIndex)
}
}

test("can have only 1 skipping index on a table") {
flint
.skippingIndex()
Expand Down

0 comments on commit 9cabe09

Please sign in to comment.