Skip to content

Commit

Permalink
Add recover index job statement (#119)
Browse files Browse the repository at this point in the history
* Add recover command syntax and API

Signed-off-by: Chen Dai <[email protected]>

* Refactor IT with assertion helper

Signed-off-by: Chen Dai <[email protected]>

* Add IT for MV

Signed-off-by: Chen Dai <[email protected]>

* Update user manual and add test for backticks

Signed-off-by: Chen Dai <[email protected]>

* Add more logging and IT on FlintSpark API layer

Signed-off-by: Chen Dai <[email protected]>

* Reformat sql text in IT

Signed-off-by: Chen Dai <[email protected]>

* Add recovering transient state

Signed-off-by: Chen Dai <[email protected]>

* Detect streaming job state in update task

Signed-off-by: Chen Dai <[email protected]>

* Address PR comment

Signed-off-by: Chen Dai <[email protected]>

---------

Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen authored Nov 6, 2023
1 parent 564b6c0 commit fda5dad
Show file tree
Hide file tree
Showing 10 changed files with 394 additions and 10 deletions.
14 changes: 14 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,20 @@ WITH (
)
```

### Index Job Management

Currently Flint index job ID is same as internal Flint index name in [OpenSearch](./index.md#OpenSearch) section below.

```sql
RECOVER INDEX JOB <id>
```

Example:

```sql
RECOVER INDEX JOB `flint_spark_catalog_default_test_skipping_index`
```

## Index Store

### OpenSearch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ object FlintMetadataLogEntry {
val DELETING: IndexState.Value = Value("deleting")
val DELETED: IndexState.Value = Value("deleted")
val FAILED: IndexState.Value = Value("failed")
val RECOVERING: IndexState.Value = Value("recovering")
val UNKNOWN: IndexState.Value = Value("unknown")

def from(s: String): IndexState.Value = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ statement
: skippingIndexStatement
| coveringIndexStatement
| materializedViewStatement
| indexJobManagementStatement
;

skippingIndexStatement
Expand Down Expand Up @@ -109,6 +110,14 @@ dropMaterializedViewStatement
: DROP MATERIALIZED VIEW mvName=multipartIdentifier
;

indexJobManagementStatement
: recoverIndexJobStatement
;

recoverIndexJobStatement
: RECOVER INDEX JOB identifier
;

/*
* Match all remaining tokens in non-greedy way
* so WITH clause won't be captured by this rule.
Expand Down
2 changes: 2 additions & 0 deletions flint-spark-integration/src/main/antlr4/SparkSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,12 @@ IF: 'IF';
IN: 'IN';
INDEX: 'INDEX';
INDEXES: 'INDEXES';
JOB: 'JOB';
MATERIALIZED: 'MATERIALIZED';
NOT: 'NOT';
ON: 'ON';
PARTITION: 'PARTITION';
RECOVER: 'RECOVER';
REFRESH: 'REFRESH';
SHOW: 'SHOW';
TRUE: 'TRUE';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package org.opensearch.flint.spark

import java.util.concurrent.{Executors, ScheduledExecutorService, TimeUnit}
import java.util.concurrent.{Executors, ScheduledExecutorService, ScheduledFuture, TimeUnit}

import scala.collection.JavaConverters._

Expand Down Expand Up @@ -47,7 +47,7 @@ class FlintSpark(val spark: SparkSession) extends Logging {
implicit val formats: Formats = Serialization.formats(NoTypeHints) + SkippingKindSerializer

/** Scheduler for updating index state regularly as needed, such as incremental refreshing */
private val executor: ScheduledExecutorService = Executors.newScheduledThreadPool(1)
var executor: ScheduledExecutorService = Executors.newScheduledThreadPool(1)

/**
* Data source name. Assign empty string in case of backward compatibility. TODO: remove this in
Expand Down Expand Up @@ -113,8 +113,10 @@ class FlintSpark(val spark: SparkSession) extends Logging {
if (latest == null) { // in case transaction capability is disabled
flintClient.createIndex(indexName, metadata)
} else {
logInfo(s"Creating index with metadata log entry ID ${latest.id}")
flintClient.createIndex(indexName, metadata.copy(latestId = Some(latest.id)))
})
logInfo("Create index complete")
} catch {
case e: Exception =>
logError("Failed to create Flint index", e)
Expand Down Expand Up @@ -146,9 +148,11 @@ class FlintSpark(val spark: SparkSession) extends Logging {
.finalLog(latest => {
// Change state to active if full, otherwise update index state regularly
if (mode == FULL) {
logInfo("Updating index state to active")
latest.copy(state = ACTIVE)
} else {
// Schedule regular update and return log entry as refreshing state
logInfo("Scheduling index state updater")
scheduleIndexStateUpdate(indexName)
latest
}
Expand Down Expand Up @@ -229,6 +233,41 @@ class FlintSpark(val spark: SparkSession) extends Logging {
throw new IllegalStateException("Failed to delete Flint index")
}
} else {
logInfo("Flint index to be deleted doesn't exist")
false
}
}

/**
* Recover index job.
*
* @param indexName
* index name
*/
def recoverIndex(indexName: String): Boolean = {
logInfo(s"Recovering Flint index $indexName")
val index = describeIndex(indexName)
if (index.exists(_.options.autoRefresh())) {
try {
flintClient
.startTransaction(indexName, dataSourceName)
.initialLog(latest => Set(ACTIVE, REFRESHING, FAILED).contains(latest.state))
.transientLog(latest => latest.copy(state = RECOVERING))
.finalLog(latest => {
scheduleIndexStateUpdate(indexName)
latest.copy(state = REFRESHING)
})
.commit(_ => doRefreshIndex(index.get, indexName, INCREMENTAL))

logInfo("Recovery complete")
true
} catch {
case e: Exception =>
logError("Failed to recover Flint index", e)
throw new IllegalStateException("Failed to recover Flint index")
}
} else {
logInfo("Index to be recovered either doesn't exist or not auto refreshed")
false
}
}
Expand All @@ -249,15 +288,28 @@ class FlintSpark(val spark: SparkSession) extends Logging {
spark.streams.active.exists(_.name == indexName)

private def scheduleIndexStateUpdate(indexName: String): Unit = {
executor.scheduleAtFixedRate(
var task: ScheduledFuture[_] = null // avoid forward reference compile error at task.cancel()
task = executor.scheduleAtFixedRate(
() => {
logInfo("Scheduler triggers index log entry update")
logInfo(s"Scheduler triggers index log entry update for $indexName")
try {
flintClient
.startTransaction(indexName, dataSourceName)
.initialLog(latest => latest.state == REFRESHING)
.finalLog(latest => latest) // timestamp will update automatically
.commit(latest => logInfo("Updating log entry to " + latest))
if (isIncrementalRefreshing(indexName)) {
logInfo("Streaming job is still active")
flintClient
.startTransaction(indexName, dataSourceName)
.initialLog(latest => latest.state == REFRESHING)
.finalLog(latest => latest) // timestamp will update automatically
.commit(_ => {})
} else {
logError("Streaming job is not active. Cancelling update task")
flintClient
.startTransaction(indexName, dataSourceName)
.initialLog(_ => true)
.finalLog(latest => latest.copy(state = FAILED))
.commit(_ => {})
task.cancel(true)
logInfo("Update task is cancelled")
}
} catch {
case e: Exception =>
logError("Failed to update index log entry", e)
Expand All @@ -274,6 +326,7 @@ class FlintSpark(val spark: SparkSession) extends Logging {
index: FlintSparkIndex,
indexName: String,
mode: RefreshMode): Option[String] = {
logInfo(s"Refreshing index $indexName in $mode mode")
val options = index.options
val tableName = index.metadata().source

Expand All @@ -288,17 +341,19 @@ class FlintSpark(val spark: SparkSession) extends Logging {
.save(indexName)
}

mode match {
val jobId = mode match {
case FULL if isIncrementalRefreshing(indexName) =>
throw new IllegalStateException(
s"Index $indexName is incremental refreshing and cannot be manual refreshed")

case FULL =>
logInfo("Start refreshing index in batch style")
batchRefresh()
None

// Flint index has specialized logic and capability for incremental refresh
case INCREMENTAL if index.isInstanceOf[StreamingRefresh] =>
logInfo("Start refreshing index in streaming style")
val job =
index
.asInstanceOf[StreamingRefresh]
Expand All @@ -313,6 +368,7 @@ class FlintSpark(val spark: SparkSession) extends Logging {

// Otherwise, fall back to foreachBatch + batch refresh
case INCREMENTAL =>
logInfo("Start refreshing index in foreach streaming style")
val job = spark.readStream
.options(options.extraSourceOptions(tableName))
.table(tableName)
Expand All @@ -325,6 +381,9 @@ class FlintSpark(val spark: SparkSession) extends Logging {
.start()
Some(job.id.toString)
}

logInfo("Refresh index complete")
jobId
}

private def stopRefreshingJob(indexName: String): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import org.antlr.v4.runtime.ParserRuleContext
import org.antlr.v4.runtime.tree.{ParseTree, RuleNode}
import org.opensearch.flint.spark.FlintSpark
import org.opensearch.flint.spark.sql.covering.FlintSparkCoveringIndexAstBuilder
import org.opensearch.flint.spark.sql.job.FlintSparkIndexJobAstBuilder
import org.opensearch.flint.spark.sql.mv.FlintSparkMaterializedViewAstBuilder
import org.opensearch.flint.spark.sql.skipping.FlintSparkSkippingIndexAstBuilder

Expand All @@ -25,6 +26,7 @@ class FlintSparkSqlAstBuilder
with FlintSparkSkippingIndexAstBuilder
with FlintSparkCoveringIndexAstBuilder
with FlintSparkMaterializedViewAstBuilder
with FlintSparkIndexJobAstBuilder
with SparkSqlAstBuilder {

override def visit(tree: ParseTree): LogicalPlan = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark.sql.job

import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder}
import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.RecoverIndexJobStatementContext

import org.apache.spark.sql.catalyst.plans.logical.Command

/**
* Flint Spark AST builder that builds Spark command for Flint index job management statement.
*/
trait FlintSparkIndexJobAstBuilder extends FlintSparkSqlExtensionsVisitor[AnyRef] {
self: SparkSqlAstBuilder =>

override def visitRecoverIndexJobStatement(ctx: RecoverIndexJobStatementContext): Command = {
FlintSparkSqlCommand() { flint =>
val jobId = ctx.identifier().getText
flint.recoverIndex(jobId)
Seq.empty
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.flint

import java.util.Collections
import java.util.concurrent.ScheduledExecutorService

import scala.collection.JavaConverters.mapAsScalaMapConverter

Expand All @@ -20,6 +21,7 @@ import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.IndexState
import org.opensearch.flint.core.storage.FlintOpenSearchClient._
import org.opensearch.flint.spark.FlintSparkSuite
import org.scalatestplus.mockito.MockitoSugar.mock

/**
* Transaction test base suite that creates the metadata log index which enables transaction
Expand All @@ -33,6 +35,9 @@ trait OpenSearchTransactionSuite extends FlintSparkSuite {
override def beforeAll(): Unit = {
super.beforeAll()
spark.conf.set("spark.flint.datasource.name", testDataSourceName)

// Replace executor to avoid impact on IT
flint.executor = mock[ScheduledExecutorService]
}

override def beforeEach(): Unit = {
Expand Down
Loading

0 comments on commit fda5dad

Please sign in to comment.