Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Alter Index SQL statement #286

Merged
merged 45 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
d4fad7b
FlintClient api for update index mapping
seankao-az Mar 13, 2024
87cb34c
FlintSpark api for update index
seankao-az Mar 13, 2024
41365b6
visitPropertyList return map, not IndexOptions
seankao-az Mar 13, 2024
dc3652a
change FlintSpark update index interface
seankao-az Mar 14, 2024
3ef3884
cancelIndex api and transaction test
seankao-az Mar 14, 2024
ffcf6ac
AST builder for alter skipping index
seankao-az Mar 14, 2024
50c47ef
refactor
seankao-az Mar 14, 2024
d384d2a
more test cases for update skipping index
seankao-az Mar 14, 2024
2a353ba
refactor and resolve race condition
seankao-az Mar 15, 2024
08bb315
alter covering index; sanity test
seankao-az Mar 15, 2024
dcb0b10
remove outdated comment
seankao-az Mar 15, 2024
dd2f7b1
update covering index suite
seankao-az Mar 15, 2024
3c2109d
alter mv; sanity test
seankao-az Mar 15, 2024
b69e2c9
Merge branch 'main' into alter-index
seankao-az Mar 15, 2024
c9c81d9
rename var name in AST builder for clarity
seankao-az Mar 15, 2024
676878d
fix mv sql test
seankao-az Mar 15, 2024
af89e30
validate should update auto_refresh option
seankao-az Mar 16, 2024
ebbe5c7
decide update mode in AST builder
seankao-az Mar 16, 2024
9453d03
move UpdateMode to FlintSpark
seankao-az Mar 16, 2024
bedbe76
move update test suite into its own class
seankao-az Mar 16, 2024
fff4383
update documentation
seankao-az Mar 16, 2024
e5c1af7
Merge branch 'main' into alter-index
seankao-az Mar 19, 2024
af4da21
update validation rule
seankao-az Mar 19, 2024
a965229
remove unnecessary log
seankao-az Mar 19, 2024
80764d5
fix test code & comment
seankao-az Mar 19, 2024
52a34d6
fix test cases for update option validation
seankao-az Mar 19, 2024
2101d34
test case for update options validation
seankao-az Mar 19, 2024
0762842
scalafmtAll
seankao-az Mar 20, 2024
7c121d5
refactor: client update index
seankao-az Mar 23, 2024
7b8bb94
refactor: move update index logic to flintspark
seankao-az Mar 23, 2024
05ebd93
refactor: rename updateIndexMapping in rest client
seankao-az Mar 23, 2024
63a7236
refactors
seankao-az Mar 23, 2024
a5cfe32
refactor: move validation to Options class
seankao-az Mar 24, 2024
de733b3
refactor: Builder for index copy with update
seankao-az Mar 24, 2024
363e04b
scalafmtAll
seankao-az Mar 24, 2024
a30a170
move validation to FlintSpark
seankao-az Mar 25, 2024
495acaa
add test case for race update; will fail
seankao-az Mar 26, 2024
6495de7
solve update index race condition
seankao-az Mar 26, 2024
d610156
refactor: rename in test cases
seankao-az Mar 26, 2024
27a5357
refactor: parameterize failure test cases
seankao-az Mar 26, 2024
66627bd
scalafmtAll
seankao-az Mar 26, 2024
8549f4d
refactor: parameterize success test cases
seankao-az Mar 27, 2024
25cb0ff
use new tempDir for checkpoint in every test case
seankao-az Mar 27, 2024
1247eaf
Merge branch 'main' into alter-index
seankao-az Mar 27, 2024
ccb76f0
fix error after merging main
seankao-az Mar 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ WITH ( options )
REFRESH SKIPPING INDEX ON <object>

[DESC|DESCRIBE] SKIPPING INDEX ON <object>

ALTER SKIPPING INDEX
ON <object>
WITH ( options )

DROP SKIPPING INDEX ON <object>

Expand Down Expand Up @@ -212,6 +216,9 @@ REFRESH SKIPPING INDEX ON alb_logs

DESCRIBE SKIPPING INDEX ON alb_logs

ALTER SKIPPING INDEX ON alb_logs
WITH ( auto_refresh = false )

DROP SKIPPING INDEX ON alb_logs

VACUUM SKIPPING INDEX ON alb_logs
Expand All @@ -231,6 +238,9 @@ SHOW [INDEX|INDEXES] ON <object>

[DESC|DESCRIBE] INDEX name ON <object>

ALTER INDEX name ON <object>
WITH ( options )

DROP INDEX name ON <object>

VACUUM INDEX name ON <object>
Expand All @@ -248,6 +258,9 @@ SHOW INDEX ON alb_logs

DESCRIBE INDEX elb_and_requestUri ON alb_logs

ALTER INDEX elb_and_requestUri ON alb_logs
WITH ( auto_refresh = false )

DROP INDEX elb_and_requestUri ON alb_logs

VACUUM INDEX elb_and_requestUri ON alb_logs
Expand All @@ -266,6 +279,9 @@ SHOW MATERIALIZED [VIEW|VIEWS] IN catalog[.database]

[DESC|DESCRIBE] MATERIALIZED VIEW name

ALTER MATERIALIZED VIEW name
WITH ( options )

DROP MATERIALIZED VIEW name

VACUUM MATERIALIZED VIEW name
Expand All @@ -288,6 +304,9 @@ SHOW MATERIALIZED VIEWS IN spark_catalog.default

DESC MATERIALIZED VIEW alb_logs_metrics

ALTER MATERIALIZED VIEW alb_logs_metrics
WITH ( auto_refresh = false )

DROP MATERIALIZED VIEW alb_logs_metrics

VACUUM MATERIALIZED VIEW alb_logs_metrics
Expand Down Expand Up @@ -354,7 +373,7 @@ User can provide the following options in `WITH` clause of create statement:
+ `refresh_interval`: a string as the time interval for incremental refresh, e.g. 1 minute, 10 seconds. This is only applicable when auto refresh enabled. Please check `org.apache.spark.unsafe.types.CalendarInterval` for valid duration identifiers. By default, next micro batch will be generated as soon as the previous one complete processing.
+ `incremental_refresh`: default value is false. incrementally refresh the index if set to true. Otherwise, fully refresh the entire index. This only applicable when auto refresh disabled.
+ `checkpoint_location`: a string as the location path for refresh job checkpoint (auto or incremental). The location has to be a path in an HDFS compatible file system and only applicable when auto refresh enabled. If unspecified, temporary checkpoint directory will be used and may result in checkpoint data lost upon restart.
+ `watermark_delay`: a string as time expression for how late data can come and still be processed, e.g. 1 minute, 10 seconds. This is required by incremental refresh on materialized view if it has aggregation in the query.
+ `watermark_delay`: a string as time expression for how late data can come and still be processed, e.g. 1 minute, 10 seconds. This is required by auto and incremental refresh on materialized view if it has aggregation in the query.
+ `output_mode`: a mode string that describes how data will be written to streaming sink. If unspecified, default append mode will be applied.
+ `index_settings`: a JSON string as index settings for OpenSearch index that will be created. Please follow the format in OpenSearch documentation. If unspecified, default OpenSearch index settings will be applied.
+ `extra_options`: a JSON string as extra options that can be passed to Spark streaming source and sink API directly. Use qualified source table name (because there could be multiple) and "sink", e.g. '{"sink": "{key: val}", "table1": {key: val}}'
Expand Down Expand Up @@ -385,6 +404,15 @@ WITH (
)
```

#### Alter Index Options

User can provide the following options in `WITH` clause of alter statement:
+ `auto_refresh`: This is required for alter statement. Currently, we restrict that an alter statement must change the auto refresh option from its original value.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the expectation of no change? it should be no-op, right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The query will fail with "error": "Fail to run query, cause: auto_refresh option must be updated"
For example, if user make a query to change index refresh_interval without changing the auto_refresh option, will get this error

+ `refresh_interval`
+ `incremental_refresh`
+ `checkpoint_location`
+ `watermark_delay`

### Index Job Management

Currently Flint index job ID is same as internal Flint index name in [OpenSearch](./index.md#OpenSearch) section below.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.opensearch.client.indices.CreateIndexResponse;
import org.opensearch.client.indices.GetIndexRequest;
import org.opensearch.client.indices.GetIndexResponse;
import org.opensearch.client.indices.PutMappingRequest;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.client.RequestOptions;
import org.opensearch.flint.core.metrics.MetricsUtil;
Expand All @@ -44,6 +45,8 @@ public interface IRestHighLevelClient extends Closeable {

CreateIndexResponse createIndex(CreateIndexRequest createIndexRequest, RequestOptions options) throws IOException;

void updateIndexMapping(PutMappingRequest putMappingRequest, RequestOptions options) throws IOException;

void deleteIndex(DeleteIndexRequest deleteIndexRequest, RequestOptions options) throws IOException;

DeleteResponse delete(DeleteRequest deleteRequest, RequestOptions options) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.opensearch.client.indices.CreateIndexResponse;
import org.opensearch.client.indices.GetIndexRequest;
import org.opensearch.client.indices.GetIndexResponse;
import org.opensearch.client.indices.PutMappingRequest;

import java.io.IOException;

Expand Down Expand Up @@ -63,6 +64,11 @@ public CreateIndexResponse createIndex(CreateIndexRequest createIndexRequest, Re
return execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.indices().create(createIndexRequest, options));
}

@Override
public void updateIndexMapping(PutMappingRequest putMappingRequest, RequestOptions options) throws IOException {
execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.indices().putMapping(putMappingRequest, options));
}

@Override
public void deleteIndex(DeleteIndexRequest deleteIndexRequest, RequestOptions options) throws IOException {
execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.indices().delete(deleteIndexRequest, options));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ <T> OptimisticTransaction<T> startTransaction(String indexName, String dataSourc
*/
FlintMetadata getIndexMetadata(String indexName);

/**
* Update a Flint index with the metadata given.
*
* @param indexName index name
* @param metadata index metadata
*/
void updateIndex(String indexName, FlintMetadata metadata);

/**
* Delete a Flint index.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ object FlintMetadataLogEntry {
val CREATING: IndexState.Value = Value("creating")
val ACTIVE: IndexState.Value = Value("active")
val REFRESHING: IndexState.Value = Value("refreshing")
val UPDATING: IndexState.Value = Value("updating")
val DELETING: IndexState.Value = Value("deleting")
val DELETED: IndexState.Value = Value("deleted")
val FAILED: IndexState.Value = Value("failed")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.opensearch.client.indices.CreateIndexRequest;
import org.opensearch.client.indices.GetIndexRequest;
import org.opensearch.client.indices.GetIndexResponse;
import org.opensearch.client.indices.PutMappingRequest;
import org.opensearch.cluster.metadata.MappingMetadata;
import org.opensearch.common.Strings;
import org.opensearch.common.settings.Settings;
Expand Down Expand Up @@ -191,6 +192,19 @@ public FlintMetadata getIndexMetadata(String indexName) {
}
}

@Override
public void updateIndex(String indexName, FlintMetadata metadata) {
LOG.info("Updating Flint index " + indexName + " with metadata " + metadata);
String osIndexName = sanitizeIndexName(indexName);
try (IRestHighLevelClient client = createClient()) {
PutMappingRequest request = new PutMappingRequest(osIndexName);
request.source(metadata.getContent(), XContentType.JSON);
client.updateIndexMapping(request, RequestOptions.DEFAULT);
} catch (Exception e) {
throw new IllegalStateException("Failed to update Flint index " + osIndexName, e);
}
}

@Override
public void deleteIndex(String indexName) {
LOG.info("Deleting Flint index " + indexName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,36 @@ class FlintSpark(val spark: SparkSession) extends Logging {
}
}

/**
* Update the given index with metadata and update associated job.
*
* @param index
* Flint index to update
* @param updateMode
* update mode
* @return
* refreshing job ID (empty if no job)
*/
def updateIndex(index: FlintSparkIndex): Option[String] = {
logInfo(s"Updating Flint index $index")
val indexName = index.name
if (flintClient.exists(indexName)) {
try {
// Relies on validation to forbid auto-to-auto and manual-to-manual updates
index.options.autoRefresh() match {
case true => updateIndexManualToAuto(index)
case false => updateIndexAutoToManual(index)
}
} catch {
case e: Exception =>
logError("Failed to update Flint index", e)
throw new IllegalStateException("Failed to update Flint index")
}
} else {
throw new IllegalStateException(s"Flint index $indexName doesn't exist")
}
}

/**
* Delete index and refreshing job associated.
*
Expand Down Expand Up @@ -354,4 +384,40 @@ class FlintSpark(val spark: SparkSession) extends Logging {
logWarning("Refreshing job not found")
}
}

private def updateIndexAutoToManual(index: FlintSparkIndex): Option[String] = {
val indexName = index.name
flintClient
.startTransaction(indexName, dataSourceName)
.initialLog(latest => latest.state == REFRESHING)
seankao-az marked this conversation as resolved.
Show resolved Hide resolved
.transientLog(latest => latest.copy(state = UPDATING))
.finalLog(latest => latest.copy(state = ACTIVE))
.commit(_ => {
flintClient.updateIndex(indexName, index.metadata)
logInfo("Update index options complete")
flintIndexMonitor.stopMonitor(indexName)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

want to confirm, for Flint, plugin cancel job and move manual, right?

my concern is if stopMonitor will stop heartBeat. but if finalLog write failed, index stuck in refreshing state.

Copy link
Collaborator Author

@seankao-az seankao-az Mar 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes plugin will cancel job and change to manual when alter statement turn off auto_refresh.
plugin will only forward the alter statement which turn on the auto_refresh to spark.

if stopMonitor will stop heartBeat. but if finalLog write failed, index stuck in refreshing state.

In general we do not have a rollback mechanism that undo any actions taken (monitor, refresh job, update OS index). Yes this can happen.

stopRefreshingJob(indexName)
None
})
}

private def updateIndexManualToAuto(index: FlintSparkIndex): Option[String] = {
val indexName = index.name
val indexRefresh = FlintSparkIndexRefresh.create(indexName, index)
flintClient
.startTransaction(indexName, dataSourceName)
.initialLog(latest => latest.state == ACTIVE)
.transientLog(latest =>
latest.copy(state = UPDATING, createTime = System.currentTimeMillis()))
dai-chen marked this conversation as resolved.
Show resolved Hide resolved
.finalLog(latest => {
logInfo("Scheduling index state monitor")
flintIndexMonitor.startMonitor(indexName)
latest.copy(state = REFRESHING)
})
.commit(_ => {
flintClient.updateIndex(indexName, index.metadata)
logInfo("Update index options complete")
indexRefresh.start(spark, flintSparkConf)
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.flint.spark

import scala.collection.JavaConverters.mapAsJavaMapConverter

import org.opensearch.flint.spark.FlintSparkIndexOptions.empty

import org.apache.spark.sql.catalog.Column
Expand Down Expand Up @@ -59,6 +61,14 @@ abstract class FlintSparkIndexBuilder(flint: FlintSpark) {
def create(ignoreIfExists: Boolean = false): Unit =
flint.createIndex(buildIndex(), ignoreIfExists)

def copyWithUpdate(index: FlintSparkIndex, options: FlintSparkIndexOptions): FlintSparkIndex = {
seankao-az marked this conversation as resolved.
Show resolved Hide resolved
val updatedOptions = index.options.update(options)
val updatedMetadata = index
.metadata()
.copy(options = updatedOptions.options.mapValues(_.asInstanceOf[AnyRef]).asJava)
FlintSparkIndexFactory.create(updatedMetadata)
}

/**
* Build method for concrete builder class to implement
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import org.json4s.native.JsonMethods._
import org.json4s.native.Serialization
import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, EXTRA_OPTIONS, INCREMENTAL_REFRESH, INDEX_SETTINGS, OptionName, OUTPUT_MODE, REFRESH_INTERVAL, WATERMARK_DELAY}
import org.opensearch.flint.spark.FlintSparkIndexOptions.validateOptionNames
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode._

/**
* Flint Spark index configurable options.
Expand Down Expand Up @@ -118,6 +119,20 @@ case class FlintSparkIndexOptions(options: Map[String, String]) {
map.result()
}

/**
* Merge two FlintSparkIndexOptions. If an option exists in both instances, the value from the
* other instance overwrites the value from this instance.
* @param other
* options to update
* @return
* updated Flint Spark index options
*/
def update(other: FlintSparkIndexOptions): FlintSparkIndexOptions = {
val updatedOptions = FlintSparkIndexOptions(options ++ other.options)
validateUpdateAllowed(other, updatedOptions)
seankao-az marked this conversation as resolved.
Show resolved Hide resolved
updatedOptions
}

private def getOptionValue(name: OptionName): Option[String] = {
options.get(name.toString)
}
Expand All @@ -127,6 +142,47 @@ case class FlintSparkIndexOptions(options: Map[String, String]) {
.map(opt => (parse(opt) \ key).extract[Map[String, String]])
.getOrElse(Map.empty)
}

/**
* Validate the index update options are allowed.
* @param other
* options to update
* @param updatedOptions
* the updated options
*/
private def validateUpdateAllowed(
other: FlintSparkIndexOptions,
updatedOptions: FlintSparkIndexOptions): Unit = {
// auto_refresh must change
if (updatedOptions.autoRefresh() == autoRefresh()) {
throw new IllegalArgumentException("auto_refresh option must be updated")
}

val refreshMode = (updatedOptions.autoRefresh(), updatedOptions.incrementalRefresh()) match {
case (true, false) => AUTO
case (false, false) => FULL
case (false, true) => INCREMENTAL
case (true, true) =>
throw new IllegalArgumentException(
"auto_refresh and incremental_refresh options cannot both be true")
}

// validate allowed options depending on refresh mode
val allowedOptions = refreshMode match {
case FULL => Set(AUTO_REFRESH, INCREMENTAL_REFRESH)
case AUTO | INCREMENTAL =>
Set(
AUTO_REFRESH,
INCREMENTAL_REFRESH,
REFRESH_INTERVAL,
CHECKPOINT_LOCATION,
WATERMARK_DELAY)
}
if (!other.options.keys.forall(allowedOptions.map(_.toString).contains)) {
throw new IllegalArgumentException(
s"Altering index to ${refreshMode} refresh only allows options: ${allowedOptions}")
}
}
}

object FlintSparkIndexOptions {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ case class FlintSparkMaterializedView(
private def watermark(timeCol: Attribute, child: LogicalPlan) = {
require(
options.watermarkDelay().isDefined,
"watermark delay is required for incremental refresh with aggregation")
"watermark delay is required for auto refresh and incremental refresh with aggregation")

val delay = options.watermarkDelay().get
EventTimeWatermark(timeCol, IntervalUtils.fromIntervalString(delay), child)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,20 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A
}
}

override def visitAlterCoveringIndexStatement(
ctx: AlterCoveringIndexStatementContext): Command = {
FlintSparkSqlCommand() { flint =>
val indexName = getFlintIndexName(flint, ctx.indexName, ctx.tableName)
val indexOptions = visitPropertyList(ctx.propertyList())
val index = flint
.describeIndex(indexName)
.getOrElse(throw new IllegalStateException(s"Index $indexName doesn't exist"))
val updatedIndex = flint.coveringIndex().copyWithUpdate(index, indexOptions)
flint.updateIndex(updatedIndex)
Seq.empty
}
}

override def visitDropCoveringIndexStatement(
ctx: DropCoveringIndexStatementContext): Command = {
FlintSparkSqlCommand() { flint =>
Expand Down
Loading
Loading