-
Notifications
You must be signed in to change notification settings - Fork 33
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement Alter Index SQL statement #286
Changes from all commits
d4fad7b
87cb34c
41365b6
dc3652a
3ef3884
ffcf6ac
50c47ef
d384d2a
2a353ba
08bb315
dcb0b10
dd2f7b1
3c2109d
b69e2c9
c9c81d9
676878d
af89e30
ebbe5c7
9453d03
bedbe76
fff4383
e5c1af7
af4da21
a965229
80764d5
52a34d6
2101d34
0762842
7c121d5
7b8bb94
05ebd93
63a7236
a5cfe32
de733b3
363e04b
a30a170
495acaa
6495de7
d610156
27a5357
66627bd
8549f4d
25cb0ff
1247eaf
ccb76f0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,10 +13,11 @@ import org.opensearch.flint.core.{FlintClient, FlintClientBuilder} | |
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState._ | ||
import org.opensearch.flint.core.metadata.log.OptimisticTransaction.NO_LOG_ENTRY | ||
import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN | ||
import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName._ | ||
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex | ||
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView | ||
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh | ||
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode.AUTO | ||
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode._ | ||
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex | ||
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKindSerializer | ||
import org.opensearch.flint.spark.skipping.recommendations.DataTypeSkippingStrategy | ||
|
@@ -202,6 +203,39 @@ class FlintSpark(val spark: SparkSession) extends Logging { | |
} | ||
} | ||
|
||
/** | ||
* Update the given index with metadata and update associated job. | ||
* | ||
* @param index | ||
* Flint index to update | ||
* @param updateMode | ||
* update mode | ||
* @return | ||
* refreshing job ID (empty if no job) | ||
*/ | ||
def updateIndex(index: FlintSparkIndex): Option[String] = { | ||
logInfo(s"Updating Flint index $index") | ||
val indexName = index.name | ||
|
||
validateUpdateAllowed( | ||
describeIndex(indexName) | ||
.getOrElse(throw new IllegalStateException(s"Index $indexName doesn't exist")) | ||
.options, | ||
index.options) | ||
|
||
try { | ||
// Relies on validation to forbid auto-to-auto and manual-to-manual updates | ||
index.options.autoRefresh() match { | ||
case true => updateIndexManualToAuto(index) | ||
case false => updateIndexAutoToManual(index) | ||
} | ||
} catch { | ||
case e: Exception => | ||
logError("Failed to update Flint index", e) | ||
throw new IllegalStateException("Failed to update Flint index") | ||
} | ||
} | ||
|
||
/** | ||
* Delete index and refreshing job associated. | ||
* | ||
|
@@ -353,4 +387,90 @@ class FlintSpark(val spark: SparkSession) extends Logging { | |
logWarning("Refreshing job not found") | ||
} | ||
} | ||
|
||
/** | ||
* Validate the index update options are allowed. | ||
* @param originalOptions | ||
* original options | ||
* @param updatedOptions | ||
* the updated options | ||
*/ | ||
private def validateUpdateAllowed( | ||
originalOptions: FlintSparkIndexOptions, | ||
updatedOptions: FlintSparkIndexOptions): Unit = { | ||
// auto_refresh must change | ||
if (updatedOptions.autoRefresh() == originalOptions.autoRefresh()) { | ||
throw new IllegalArgumentException("auto_refresh option must be updated") | ||
} | ||
|
||
val refreshMode = (updatedOptions.autoRefresh(), updatedOptions.incrementalRefresh()) match { | ||
case (true, false) => AUTO | ||
case (false, false) => FULL | ||
case (false, true) => INCREMENTAL | ||
case (true, true) => | ||
throw new IllegalArgumentException( | ||
"auto_refresh and incremental_refresh options cannot both be true") | ||
} | ||
|
||
// validate allowed options depending on refresh mode | ||
val allowedOptionNames = refreshMode match { | ||
case FULL => Set(AUTO_REFRESH, INCREMENTAL_REFRESH) | ||
case AUTO | INCREMENTAL => | ||
Set( | ||
AUTO_REFRESH, | ||
INCREMENTAL_REFRESH, | ||
REFRESH_INTERVAL, | ||
CHECKPOINT_LOCATION, | ||
WATERMARK_DELAY) | ||
} | ||
|
||
// Get the changed option names | ||
val updateOptionNames = updatedOptions.options.filterNot { case (k, v) => | ||
originalOptions.options.get(k).contains(v) | ||
}.keys | ||
if (!updateOptionNames.forall(allowedOptionNames.map(_.toString).contains)) { | ||
throw new IllegalArgumentException( | ||
s"Altering index to ${refreshMode} refresh only allows options: ${allowedOptionNames}") | ||
} | ||
} | ||
|
||
private def updateIndexAutoToManual(index: FlintSparkIndex): Option[String] = { | ||
val indexName = index.name | ||
val indexLogEntry = index.latestLogEntry.get | ||
flintClient | ||
.startTransaction(indexName, dataSourceName) | ||
.initialLog(latest => | ||
latest.state == REFRESHING && latest.seqNo == indexLogEntry.seqNo && latest.primaryTerm == indexLogEntry.primaryTerm) | ||
.transientLog(latest => latest.copy(state = UPDATING)) | ||
.finalLog(latest => latest.copy(state = ACTIVE)) | ||
.commit(_ => { | ||
flintClient.updateIndex(indexName, index.metadata) | ||
logInfo("Update index options complete") | ||
flintIndexMonitor.stopMonitor(indexName) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. want to confirm, for Flint, plugin cancel job and move manual, right? my concern is if stopMonitor will stop heartBeat. but if finalLog write failed, index stuck in refreshing state. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes plugin will cancel job and change to manual when alter statement turn off auto_refresh.
In general we do not have a rollback mechanism that undo any actions taken (monitor, refresh job, update OS index). Yes this can happen. |
||
stopRefreshingJob(indexName) | ||
None | ||
}) | ||
} | ||
|
||
private def updateIndexManualToAuto(index: FlintSparkIndex): Option[String] = { | ||
val indexName = index.name | ||
val indexLogEntry = index.latestLogEntry.get | ||
val indexRefresh = FlintSparkIndexRefresh.create(indexName, index) | ||
flintClient | ||
.startTransaction(indexName, dataSourceName) | ||
.initialLog(latest => | ||
latest.state == ACTIVE && latest.seqNo == indexLogEntry.seqNo && latest.primaryTerm == indexLogEntry.primaryTerm) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the future, should use another lock to protect entire update process (and possibly other transactions) and not rely on checks like this which may be prone to error such as missing metadata log. |
||
.transientLog(latest => | ||
latest.copy(state = UPDATING, createTime = System.currentTimeMillis())) | ||
dai-chen marked this conversation as resolved.
Show resolved
Hide resolved
|
||
.finalLog(latest => { | ||
logInfo("Scheduling index state monitor") | ||
flintIndexMonitor.startMonitor(indexName) | ||
latest.copy(state = REFRESHING) | ||
}) | ||
.commit(_ => { | ||
flintClient.updateIndex(indexName, index.metadata) | ||
logInfo("Update index options complete") | ||
indexRefresh.start(spark, flintSparkConf) | ||
}) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the expectation of no change? it should be no-op, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The query will fail with
"error": "Fail to run query, cause: auto_refresh option must be updated"
For example, if user make a query to change index refresh_interval without changing the auto_refresh option, will get this error