Skip to content

Commit

Permalink
Implement Alter Index SQL statement (#286)
Browse files Browse the repository at this point in the history
* FlintClient api for update index mapping

Signed-off-by: Sean Kao <[email protected]>

* FlintSpark api for update index

Signed-off-by: Sean Kao <[email protected]>

* visitPropertyList return map, not IndexOptions

This is to preserve the raw options specified in user sql statement.
This is required by alter index statement.

Signed-off-by: Sean Kao <[email protected]>

* change FlintSpark update index interface

Signed-off-by: Sean Kao <[email protected]>

* cancelIndex api and transaction test

Signed-off-by: Sean Kao <[email protected]>

* AST builder for alter skipping index

Signed-off-by: Sean Kao <[email protected]>

* refactor

Signed-off-by: Sean Kao <[email protected]>

* more test cases for update skipping index

Signed-off-by: Sean Kao <[email protected]>

* refactor and resolve race condition

Signed-off-by: Sean Kao <[email protected]>

* alter covering index; sanity test

Signed-off-by: Sean Kao <[email protected]>

* remove outdated comment

Signed-off-by: Sean Kao <[email protected]>

* update covering index suite

Signed-off-by: Sean Kao <[email protected]>

* alter mv; sanity test

Signed-off-by: Sean Kao <[email protected]>

* rename var name in AST builder for clarity

Signed-off-by: Sean Kao <[email protected]>

* fix mv sql test

Signed-off-by: Sean Kao <[email protected]>

* validate should update auto_refresh option

Signed-off-by: Sean Kao <[email protected]>

* decide update mode in AST builder

Signed-off-by: Sean Kao <[email protected]>

* move UpdateMode to FlintSpark

Signed-off-by: Sean Kao <[email protected]>

* move update test suite into its own class

Signed-off-by: Sean Kao <[email protected]>

* update documentation

Signed-off-by: Sean Kao <[email protected]>

* update validation rule

Signed-off-by: Sean Kao <[email protected]>

* remove unnecessary log

Signed-off-by: Sean Kao <[email protected]>

* fix test code & comment

Signed-off-by: Sean Kao <[email protected]>

* fix test cases for update option validation

Signed-off-by: Sean Kao <[email protected]>

* test case for update options validation

Signed-off-by: Sean Kao <[email protected]>

* scalafmtAll

Signed-off-by: Sean Kao <[email protected]>

* refactor: client update index

Signed-off-by: Sean Kao <[email protected]>

* refactor: move update index logic to flintspark

Signed-off-by: Sean Kao <[email protected]>

* refactor: rename updateIndexMapping in rest client

Signed-off-by: Sean Kao <[email protected]>

* refactors

* Use FlintSparkIndexOptions, instead of options Map
* Define merge operation
* Remove UpdateMode
* Have updateIndexOptions return FlintSparkIndex

Signed-off-by: Sean Kao <[email protected]>

* refactor: move validation to Options class

Signed-off-by: Sean Kao <[email protected]>

* refactor: Builder for index copy with update

Signed-off-by: Sean Kao <[email protected]>

* scalafmtAll

Signed-off-by: Sean Kao <[email protected]>

* move validation to FlintSpark

Signed-off-by: Sean Kao <[email protected]>

* add test case for race update; will fail

Signed-off-by: Sean Kao <[email protected]>

* solve update index race condition

Signed-off-by: Sean Kao <[email protected]>

* refactor: rename in test cases

Signed-off-by: Sean Kao <[email protected]>

* refactor: parameterize failure test cases

Signed-off-by: Sean Kao <[email protected]>

* scalafmtAll

Signed-off-by: Sean Kao <[email protected]>

* refactor: parameterize success test cases

Signed-off-by: Sean Kao <[email protected]>

* use new tempDir for checkpoint in every test case

Signed-off-by: Sean Kao <[email protected]>

* fix error after merging main

Signed-off-by: Sean Kao <[email protected]>

---------

Signed-off-by: Sean Kao <[email protected]>
  • Loading branch information
seankao-az authored Mar 28, 2024
1 parent e87d330 commit a38747f
Show file tree
Hide file tree
Showing 21 changed files with 1,153 additions and 6 deletions.
30 changes: 29 additions & 1 deletion docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ WITH ( options )
REFRESH SKIPPING INDEX ON <object>

[DESC|DESCRIBE] SKIPPING INDEX ON <object>

ALTER SKIPPING INDEX
ON <object>
WITH ( options )

DROP SKIPPING INDEX ON <object>

Expand Down Expand Up @@ -212,6 +216,9 @@ REFRESH SKIPPING INDEX ON alb_logs

DESCRIBE SKIPPING INDEX ON alb_logs

ALTER SKIPPING INDEX ON alb_logs
WITH ( auto_refresh = false )

DROP SKIPPING INDEX ON alb_logs

VACUUM SKIPPING INDEX ON alb_logs
Expand All @@ -231,6 +238,9 @@ SHOW [INDEX|INDEXES] ON <object>

[DESC|DESCRIBE] INDEX name ON <object>

ALTER INDEX name ON <object>
WITH ( options )

DROP INDEX name ON <object>

VACUUM INDEX name ON <object>
Expand All @@ -248,6 +258,9 @@ SHOW INDEX ON alb_logs

DESCRIBE INDEX elb_and_requestUri ON alb_logs

ALTER INDEX elb_and_requestUri ON alb_logs
WITH ( auto_refresh = false )

DROP INDEX elb_and_requestUri ON alb_logs

VACUUM INDEX elb_and_requestUri ON alb_logs
Expand All @@ -266,6 +279,9 @@ SHOW MATERIALIZED [VIEW|VIEWS] IN catalog[.database]

[DESC|DESCRIBE] MATERIALIZED VIEW name

ALTER MATERIALIZED VIEW name
WITH ( options )

DROP MATERIALIZED VIEW name

VACUUM MATERIALIZED VIEW name
Expand All @@ -288,6 +304,9 @@ SHOW MATERIALIZED VIEWS IN spark_catalog.default

DESC MATERIALIZED VIEW alb_logs_metrics

ALTER MATERIALIZED VIEW alb_logs_metrics
WITH ( auto_refresh = false )

DROP MATERIALIZED VIEW alb_logs_metrics

VACUUM MATERIALIZED VIEW alb_logs_metrics
Expand Down Expand Up @@ -354,7 +373,7 @@ User can provide the following options in `WITH` clause of create statement:
+ `refresh_interval`: a string as the time interval for incremental refresh, e.g. 1 minute, 10 seconds. This is only applicable when auto refresh enabled. Please check `org.apache.spark.unsafe.types.CalendarInterval` for valid duration identifiers. By default, next micro batch will be generated as soon as the previous one complete processing.
+ `incremental_refresh`: default value is false. incrementally refresh the index if set to true. Otherwise, fully refresh the entire index. This only applicable when auto refresh disabled.
+ `checkpoint_location`: a string as the location path for refresh job checkpoint (auto or incremental). The location has to be a path in an HDFS compatible file system and only applicable when auto refresh enabled. If unspecified, temporary checkpoint directory will be used and may result in checkpoint data lost upon restart.
+ `watermark_delay`: a string as time expression for how late data can come and still be processed, e.g. 1 minute, 10 seconds. This is required by incremental refresh on materialized view if it has aggregation in the query.
+ `watermark_delay`: a string as time expression for how late data can come and still be processed, e.g. 1 minute, 10 seconds. This is required by auto and incremental refresh on materialized view if it has aggregation in the query.
+ `output_mode`: a mode string that describes how data will be written to streaming sink. If unspecified, default append mode will be applied.
+ `index_settings`: a JSON string as index settings for OpenSearch index that will be created. Please follow the format in OpenSearch documentation. If unspecified, default OpenSearch index settings will be applied.
+ `extra_options`: a JSON string as extra options that can be passed to Spark streaming source and sink API directly. Use qualified source table name (because there could be multiple) and "sink", e.g. '{"sink": "{key: val}", "table1": {key: val}}'
Expand Down Expand Up @@ -385,6 +404,15 @@ WITH (
)
```

#### Alter Index Options

User can provide the following options in `WITH` clause of alter statement:
+ `auto_refresh`: This is required for alter statement. Currently, we restrict that an alter statement must change the auto refresh option from its original value.
+ `refresh_interval`
+ `incremental_refresh`
+ `checkpoint_location`
+ `watermark_delay`

### Index Job Management

Currently Flint index job ID is same as internal Flint index name in [OpenSearch](./index.md#OpenSearch) section below.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.opensearch.client.indices.CreateIndexResponse;
import org.opensearch.client.indices.GetIndexRequest;
import org.opensearch.client.indices.GetIndexResponse;
import org.opensearch.client.indices.PutMappingRequest;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.client.RequestOptions;
import org.opensearch.flint.core.metrics.MetricsUtil;
Expand All @@ -44,6 +45,8 @@ public interface IRestHighLevelClient extends Closeable {

CreateIndexResponse createIndex(CreateIndexRequest createIndexRequest, RequestOptions options) throws IOException;

void updateIndexMapping(PutMappingRequest putMappingRequest, RequestOptions options) throws IOException;

void deleteIndex(DeleteIndexRequest deleteIndexRequest, RequestOptions options) throws IOException;

DeleteResponse delete(DeleteRequest deleteRequest, RequestOptions options) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.opensearch.client.indices.CreateIndexResponse;
import org.opensearch.client.indices.GetIndexRequest;
import org.opensearch.client.indices.GetIndexResponse;
import org.opensearch.client.indices.PutMappingRequest;

import java.io.IOException;

Expand Down Expand Up @@ -63,6 +64,11 @@ public CreateIndexResponse createIndex(CreateIndexRequest createIndexRequest, Re
return execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.indices().create(createIndexRequest, options));
}

@Override
public void updateIndexMapping(PutMappingRequest putMappingRequest, RequestOptions options) throws IOException {
execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.indices().putMapping(putMappingRequest, options));
}

@Override
public void deleteIndex(DeleteIndexRequest deleteIndexRequest, RequestOptions options) throws IOException {
execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.indices().delete(deleteIndexRequest, options));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ <T> OptimisticTransaction<T> startTransaction(String indexName, String dataSourc
*/
FlintMetadata getIndexMetadata(String indexName);

/**
* Update a Flint index with the metadata given.
*
* @param indexName index name
* @param metadata index metadata
*/
void updateIndex(String indexName, FlintMetadata metadata);

/**
* Delete a Flint index.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ object FlintMetadataLogEntry {
val CREATING: IndexState.Value = Value("creating")
val ACTIVE: IndexState.Value = Value("active")
val REFRESHING: IndexState.Value = Value("refreshing")
val UPDATING: IndexState.Value = Value("updating")
val DELETING: IndexState.Value = Value("deleting")
val DELETED: IndexState.Value = Value("deleted")
val FAILED: IndexState.Value = Value("failed")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.opensearch.client.indices.CreateIndexRequest;
import org.opensearch.client.indices.GetIndexRequest;
import org.opensearch.client.indices.GetIndexResponse;
import org.opensearch.client.indices.PutMappingRequest;
import org.opensearch.cluster.metadata.MappingMetadata;
import org.opensearch.common.Strings;
import org.opensearch.common.settings.Settings;
Expand Down Expand Up @@ -191,6 +192,19 @@ public FlintMetadata getIndexMetadata(String indexName) {
}
}

@Override
public void updateIndex(String indexName, FlintMetadata metadata) {
LOG.info("Updating Flint index " + indexName + " with metadata " + metadata);
String osIndexName = sanitizeIndexName(indexName);
try (IRestHighLevelClient client = createClient()) {
PutMappingRequest request = new PutMappingRequest(osIndexName);
request.source(metadata.getContent(), XContentType.JSON);
client.updateIndexMapping(request, RequestOptions.DEFAULT);
} catch (Exception e) {
throw new IllegalStateException("Failed to update Flint index " + osIndexName, e);
}
}

@Override
public void deleteIndex(String indexName) {
LOG.info("Deleting Flint index " + indexName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@ import org.opensearch.flint.core.{FlintClient, FlintClientBuilder}
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState._
import org.opensearch.flint.core.metadata.log.OptimisticTransaction.NO_LOG_ENTRY
import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN
import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName._
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode.AUTO
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode._
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKindSerializer
import org.opensearch.flint.spark.skipping.recommendations.DataTypeSkippingStrategy
Expand Down Expand Up @@ -202,6 +203,39 @@ class FlintSpark(val spark: SparkSession) extends Logging {
}
}

/**
* Update the given index with metadata and update associated job.
*
* @param index
* Flint index to update
* @param updateMode
* update mode
* @return
* refreshing job ID (empty if no job)
*/
def updateIndex(index: FlintSparkIndex): Option[String] = {
logInfo(s"Updating Flint index $index")
val indexName = index.name

validateUpdateAllowed(
describeIndex(indexName)
.getOrElse(throw new IllegalStateException(s"Index $indexName doesn't exist"))
.options,
index.options)

try {
// Relies on validation to forbid auto-to-auto and manual-to-manual updates
index.options.autoRefresh() match {
case true => updateIndexManualToAuto(index)
case false => updateIndexAutoToManual(index)
}
} catch {
case e: Exception =>
logError("Failed to update Flint index", e)
throw new IllegalStateException("Failed to update Flint index")
}
}

/**
* Delete index and refreshing job associated.
*
Expand Down Expand Up @@ -353,4 +387,90 @@ class FlintSpark(val spark: SparkSession) extends Logging {
logWarning("Refreshing job not found")
}
}

/**
* Validate the index update options are allowed.
* @param originalOptions
* original options
* @param updatedOptions
* the updated options
*/
private def validateUpdateAllowed(
originalOptions: FlintSparkIndexOptions,
updatedOptions: FlintSparkIndexOptions): Unit = {
// auto_refresh must change
if (updatedOptions.autoRefresh() == originalOptions.autoRefresh()) {
throw new IllegalArgumentException("auto_refresh option must be updated")
}

val refreshMode = (updatedOptions.autoRefresh(), updatedOptions.incrementalRefresh()) match {
case (true, false) => AUTO
case (false, false) => FULL
case (false, true) => INCREMENTAL
case (true, true) =>
throw new IllegalArgumentException(
"auto_refresh and incremental_refresh options cannot both be true")
}

// validate allowed options depending on refresh mode
val allowedOptionNames = refreshMode match {
case FULL => Set(AUTO_REFRESH, INCREMENTAL_REFRESH)
case AUTO | INCREMENTAL =>
Set(
AUTO_REFRESH,
INCREMENTAL_REFRESH,
REFRESH_INTERVAL,
CHECKPOINT_LOCATION,
WATERMARK_DELAY)
}

// Get the changed option names
val updateOptionNames = updatedOptions.options.filterNot { case (k, v) =>
originalOptions.options.get(k).contains(v)
}.keys
if (!updateOptionNames.forall(allowedOptionNames.map(_.toString).contains)) {
throw new IllegalArgumentException(
s"Altering index to ${refreshMode} refresh only allows options: ${allowedOptionNames}")
}
}

private def updateIndexAutoToManual(index: FlintSparkIndex): Option[String] = {
val indexName = index.name
val indexLogEntry = index.latestLogEntry.get
flintClient
.startTransaction(indexName, dataSourceName)
.initialLog(latest =>
latest.state == REFRESHING && latest.seqNo == indexLogEntry.seqNo && latest.primaryTerm == indexLogEntry.primaryTerm)
.transientLog(latest => latest.copy(state = UPDATING))
.finalLog(latest => latest.copy(state = ACTIVE))
.commit(_ => {
flintClient.updateIndex(indexName, index.metadata)
logInfo("Update index options complete")
flintIndexMonitor.stopMonitor(indexName)
stopRefreshingJob(indexName)
None
})
}

private def updateIndexManualToAuto(index: FlintSparkIndex): Option[String] = {
val indexName = index.name
val indexLogEntry = index.latestLogEntry.get
val indexRefresh = FlintSparkIndexRefresh.create(indexName, index)
flintClient
.startTransaction(indexName, dataSourceName)
.initialLog(latest =>
latest.state == ACTIVE && latest.seqNo == indexLogEntry.seqNo && latest.primaryTerm == indexLogEntry.primaryTerm)
.transientLog(latest =>
latest.copy(state = UPDATING, createTime = System.currentTimeMillis()))
.finalLog(latest => {
logInfo("Scheduling index state monitor")
flintIndexMonitor.startMonitor(indexName)
latest.copy(state = REFRESHING)
})
.commit(_ => {
flintClient.updateIndex(indexName, index.metadata)
logInfo("Update index options complete")
indexRefresh.start(spark, flintSparkConf)
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.flint.spark

import scala.collection.JavaConverters.mapAsJavaMapConverter

import org.opensearch.flint.spark.FlintSparkIndexOptions.empty

import org.apache.spark.sql.catalog.Column
Expand Down Expand Up @@ -59,6 +61,28 @@ abstract class FlintSparkIndexBuilder(flint: FlintSpark) {
def create(ignoreIfExists: Boolean = false): Unit =
flint.createIndex(buildIndex(), ignoreIfExists)

/**
* Copy Flint index with updated options.
*
* @param index
* Flint index to copy
* @param updateOptions
* options to update
* @return
* updated Flint index
*/
def copyWithUpdate(
index: FlintSparkIndex,
updateOptions: FlintSparkIndexOptions): FlintSparkIndex = {
val originalOptions = index.options
val updatedOptions =
originalOptions.copy(options = originalOptions.options ++ updateOptions.options)
val updatedMetadata = index
.metadata()
.copy(options = updatedOptions.options.mapValues(_.asInstanceOf[AnyRef]).asJava)
FlintSparkIndexFactory.create(updatedMetadata).get
}

/**
* Build method for concrete builder class to implement
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ case class FlintSparkMaterializedView(
private def watermark(timeCol: Attribute, child: LogicalPlan) = {
require(
options.watermarkDelay().isDefined,
"watermark delay is required for incremental refresh with aggregation")
"watermark delay is required for auto refresh and incremental refresh with aggregation")

val delay = options.watermarkDelay().get
EventTimeWatermark(timeCol, IntervalUtils.fromIntervalString(delay), child)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,20 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A
}
}

override def visitAlterCoveringIndexStatement(
ctx: AlterCoveringIndexStatementContext): Command = {
FlintSparkSqlCommand() { flint =>
val indexName = getFlintIndexName(flint, ctx.indexName, ctx.tableName)
val indexOptions = visitPropertyList(ctx.propertyList())
val index = flint
.describeIndex(indexName)
.getOrElse(throw new IllegalStateException(s"Index $indexName doesn't exist"))
val updatedIndex = flint.coveringIndex().copyWithUpdate(index, indexOptions)
flint.updateIndex(updatedIndex)
Seq.empty
}
}

override def visitDropCoveringIndexStatement(
ctx: DropCoveringIndexStatementContext): Command = {
FlintSparkSqlCommand() { flint =>
Expand Down
Loading

0 comments on commit a38747f

Please sign in to comment.