diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 50f4a64b2..5f22e0e81 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -1,2 +1,2 @@ # This should match the owning team set up in https://github.com/orgs/opensearch-project/teams -* @joshuali925 @dai-chen @rupal-bq @mengweieric @vamsi-amazon @penghuo @seankao-az @anirudha +* @joshuali925 @dai-chen @rupal-bq @mengweieric @vamsi-amazon @penghuo @seankao-az @anirudha @kaituo @YANG-DB diff --git a/MAINTAINERS.md b/MAINTAINERS.md index 940ffa150..4c5b6c255 100644 --- a/MAINTAINERS.md +++ b/MAINTAINERS.md @@ -15,3 +15,4 @@ This document contains a list of maintainers in this repo. See [opensearch-proje | Lior Perry | [yangdb](https://github.com/YANG-DB) | Amazon | | Sean Kao | [seankao-az](https://github.com/seankao-az) | Amazon | | Anirudha Jadhav | [anirudha](https://github.com/anirudha) | Amazon | +| Kaituo Li | [kaituo](https://github.com/kaituo) | Amazon | diff --git a/docs/index.md b/docs/index.md index 16af00ed8..288ea8e4f 100644 --- a/docs/index.md +++ b/docs/index.md @@ -264,6 +264,20 @@ WITH ( ) ``` +### Index Job Management + +Currently Flint index job ID is same as internal Flint index name in [OpenSearch](./index.md#OpenSearch) section below. + +```sql +RECOVER INDEX JOB +``` + +Example: + +```sql +RECOVER INDEX JOB `flint_spark_catalog_default_test_skipping_index` +``` + ## Index Store ### OpenSearch @@ -342,8 +356,8 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i - `spark.datasource.flint.write.id_name`: no default value. - `spark.datasource.flint.ignore.id_column` : default value is true. - `spark.datasource.flint.write.batch_size`: default value is 1000. -- `spark.datasource.flint.write.refresh_policy`: default value is false. valid values [NONE(false), - IMMEDIATE(true), WAIT_UNTIL(wait_for)] +- `spark.datasource.flint.write.refresh_policy`: default value is wait_for. valid values [NONE + (false), IMMEDIATE(true), WAIT_UNTIL(wait_for)] - `spark.datasource.flint.read.scroll_size`: default value is 100. - `spark.flint.optimizer.enabled`: default is true. - `spark.flint.index.hybridscan.enabled`: default is false. diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java b/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java index 60b4bfc8c..c44dd939f 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java @@ -58,7 +58,7 @@ public class FlintOptions implements Serializable { * * WAIT_UNTIL("wait_for") */ - public static final String DEFAULT_REFRESH_POLICY = "false"; + public static final String DEFAULT_REFRESH_POLICY = "wait_for"; public FlintOptions(Map options) { this.options = options; diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java index 2019d8812..48782a303 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java @@ -5,6 +5,8 @@ package org.opensearch.flint.core.metadata.log; +import static java.util.logging.Level.SEVERE; +import static java.util.logging.Level.WARNING; import static org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState$; import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM; import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; @@ -76,23 +78,46 @@ public T commit(Function operation) { metadataLog.getLatest().orElseGet(() -> metadataLog.add(emptyLogEntry())); // Perform initial log check - if (initialCondition.test(latest)) { + if (!initialCondition.test(latest)) { + LOG.warning("Initial log entry doesn't satisfy precondition " + latest); + throw new IllegalStateException( + "Transaction failed due to initial log precondition not satisfied"); + } - // Append optional transient log - if (transientAction != null) { - latest = metadataLog.add(transientAction.apply(latest)); - } + // Append optional transient log + FlintMetadataLogEntry initialLog = latest; + if (transientAction != null) { + latest = metadataLog.add(transientAction.apply(latest)); + + // Copy latest seqNo and primaryTerm to initialLog for potential rollback use + initialLog = initialLog.copy( + initialLog.id(), + latest.seqNo(), + latest.primaryTerm(), + initialLog.createTime(), + initialLog.state(), + initialLog.dataSource(), + initialLog.error()); + } - // Perform operation + // Perform operation + try { T result = operation.apply(latest); // Append final log metadataLog.add(finalAction.apply(latest)); return result; - } else { - LOG.warning("Initial log entry doesn't satisfy precondition " + latest); - throw new IllegalStateException( - "Transaction failed due to initial log precondition not satisfied"); + } catch (Exception e) { + LOG.log(SEVERE, "Rolling back transient log due to transaction operation failure", e); + try { + // Roll back transient log if any + if (transientAction != null) { + metadataLog.add(initialLog); + } + } catch (Exception ex) { + LOG.log(WARNING, "Failed to rollback transient log", ex); + } + throw new IllegalStateException("Failed to commit transaction operation"); } } @@ -101,6 +126,7 @@ private FlintMetadataLogEntry emptyLogEntry() { "", UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, + 0L, IndexState$.MODULE$.EMPTY(), dataSourceName, ""); diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogEntry.scala b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogEntry.scala index 8b67ac2ae..fea9974c6 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogEntry.scala +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogEntry.scala @@ -29,8 +29,13 @@ case class FlintMetadataLogEntry( id: String, seqNo: Long, primaryTerm: Long, + /** + * This is currently used as streaming job start time. In future, this should represent the + * create timestamp of the log entry + */ + createTime: Long, state: IndexState, - dataSource: String, // TODO: get from Spark conf + dataSource: String, error: String) { def this(id: String, seqNo: Long, primaryTerm: Long, map: java.util.Map[String, AnyRef]) { @@ -38,6 +43,8 @@ case class FlintMetadataLogEntry( id, seqNo, primaryTerm, + /* getSourceAsMap() may use Integer or Long even though it's always long in index mapping */ + map.get("jobStartTime").asInstanceOf[Number].longValue(), IndexState.from(map.get("state").asInstanceOf[String]), map.get("dataSourceName").asInstanceOf[String], map.get("error").asInstanceOf[String]) @@ -48,12 +55,14 @@ case class FlintMetadataLogEntry( s""" |{ | "version": "1.0", + | "latestId": "$id", | "type": "flintindexstate", | "state": "$state", | "applicationId": "${sys.env.getOrElse("SERVERLESS_EMR_VIRTUAL_CLUSTER_ID", "unknown")}", | "jobId": "${sys.env.getOrElse("SERVERLESS_EMR_JOB_ID", "unknown")}", | "dataSourceName": "$dataSource", - | "lastUpdateTime": "${System.currentTimeMillis()}", + | "jobStartTime": $createTime, + | "lastUpdateTime": ${System.currentTimeMillis()}, | "error": "$error" |} |""".stripMargin @@ -74,6 +83,7 @@ object FlintMetadataLogEntry { val DELETING: IndexState.Value = Value("deleting") val DELETED: IndexState.Value = Value("deleted") val FAILED: IndexState.Value = Value("failed") + val RECOVERING: IndexState.Value = Value("recovering") val UNKNOWN: IndexState.Value = Value("unknown") def from(s: String): IndexState.Value = { diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java index dc2efc595..07029d608 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java @@ -98,6 +98,7 @@ private FlintMetadataLogEntry createLogEntry(FlintMetadataLogEntry logEntry) { latestId, logEntry.seqNo(), logEntry.primaryTerm(), + logEntry.createTime(), logEntry.state(), logEntry.dataSource(), logEntry.error()); @@ -135,6 +136,7 @@ private FlintMetadataLogEntry writeLogEntry( logEntry.id(), response.getSeqNo(), response.getPrimaryTerm(), + logEntry.createTime(), logEntry.state(), logEntry.dataSource(), logEntry.error()); diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchScrollReader.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchScrollReader.java index d916c8ad6..af44bb6a6 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchScrollReader.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchScrollReader.java @@ -5,6 +5,7 @@ package org.opensearch.flint.core.storage; +import org.opensearch.OpenSearchStatusException; import org.opensearch.action.search.ClearScrollRequest; import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchResponse; @@ -17,12 +18,16 @@ import org.opensearch.search.builder.SearchSourceBuilder; import java.io.IOException; +import java.util.logging.Level; +import java.util.logging.Logger; /** * {@link OpenSearchReader} using scroll search. https://opensearch.org/docs/latest/api-reference/scroll/ */ public class OpenSearchScrollReader extends OpenSearchReader { + private static final Logger LOG = Logger.getLogger(OpenSearchScrollReader.class.getName()); + /** Default scroll context timeout in minutes. */ public static final TimeValue DEFAULT_SCROLL_TIMEOUT = TimeValue.timeValueMinutes(5L); @@ -54,10 +59,16 @@ SearchResponse search(SearchRequest request) throws IOException { * clean the scroll context. */ void clean() throws IOException { - if (!Strings.isNullOrEmpty(scrollId)) { - ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); - clearScrollRequest.addScrollId(scrollId); - client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT); + try { + if (!Strings.isNullOrEmpty(scrollId)) { + ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); + clearScrollRequest.addScrollId(scrollId); + client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT); + } + } catch (OpenSearchStatusException e) { + // OpenSearch throw exception if scroll already closed. https://github.com/opensearch-project/OpenSearch/issues/11121 + LOG.log(Level.WARNING, "close scroll exception, it is a known bug https://github" + + ".com/opensearch-project/OpenSearch/issues/11121.", e); } } } diff --git a/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 b/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 index e44944fcf..cb2e14144 100644 --- a/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 +++ b/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 @@ -18,6 +18,7 @@ statement : skippingIndexStatement | coveringIndexStatement | materializedViewStatement + | indexJobManagementStatement ; skippingIndexStatement @@ -109,6 +110,14 @@ dropMaterializedViewStatement : DROP MATERIALIZED VIEW mvName=multipartIdentifier ; +indexJobManagementStatement + : recoverIndexJobStatement + ; + +recoverIndexJobStatement + : RECOVER INDEX JOB identifier + ; + /* * Match all remaining tokens in non-greedy way * so WITH clause won't be captured by this rule. diff --git a/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 b/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 index 597a1e585..fe6fd3c66 100644 --- a/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 +++ b/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 @@ -165,10 +165,12 @@ IF: 'IF'; IN: 'IN'; INDEX: 'INDEX'; INDEXES: 'INDEXES'; +JOB: 'JOB'; MATERIALIZED: 'MATERIALIZED'; NOT: 'NOT'; ON: 'ON'; PARTITION: 'PARTITION'; +RECOVER: 'RECOVER'; REFRESH: 'REFRESH'; SHOW: 'SHOW'; TRUE: 'TRUE'; diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/package.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/package.scala index 0bac6ac73..eba99b809 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/package.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/package.scala @@ -7,12 +7,25 @@ package org.apache.spark.sql import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.connector.catalog._ +import org.apache.spark.util.ShutdownHookManager /** * Flint utility methods that rely on access to private code in Spark SQL package. */ package object flint { + /** + * Add shutdown hook to SparkContext with default priority. + * + * @param hook + * hook with the code to run during shutdown + * @return + * a handle that can be used to unregister the shutdown hook. + */ + def addShutdownHook(hook: () => Unit): AnyRef = { + ShutdownHookManager.addShutdownHook(hook) + } + /** * Convert the given logical plan to Spark data frame. * diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 2713f464a..e9331113a 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -5,8 +5,6 @@ package org.opensearch.flint.spark -import java.util.concurrent.{Executors, ScheduledExecutorService, TimeUnit} - import scala.collection.JavaConverters._ import org.json4s.{Formats, NoTypeHints} @@ -46,9 +44,6 @@ class FlintSpark(val spark: SparkSession) extends Logging { /** Required by json4s parse function */ implicit val formats: Formats = Serialization.formats(NoTypeHints) + SkippingKindSerializer - /** Scheduler for updating index state regularly as needed, such as incremental refreshing */ - private val executor: ScheduledExecutorService = Executors.newScheduledThreadPool(1) - /** * Data source name. Assign empty string in case of backward compatibility. TODO: remove this in * future @@ -56,6 +51,10 @@ class FlintSpark(val spark: SparkSession) extends Logging { private val dataSourceName: String = spark.conf.getOption("spark.flint.datasource.name").getOrElse("") + /** Flint Spark index monitor */ + private val flintIndexMonitor: FlintSparkIndexMonitor = + new FlintSparkIndexMonitor(spark, flintClient, dataSourceName) + /** * Create index builder for creating index with fluent API. * @@ -106,15 +105,17 @@ class FlintSpark(val spark: SparkSession) extends Logging { try { flintClient .startTransaction(indexName, dataSourceName) - .initialLog(latest => latest.state == EMPTY) + .initialLog(latest => latest.state == EMPTY || latest.state == DELETED) .transientLog(latest => latest.copy(state = CREATING)) .finalLog(latest => latest.copy(state = ACTIVE)) .commit(latest => if (latest == null) { // in case transaction capability is disabled flintClient.createIndex(indexName, metadata) } else { + logInfo(s"Creating index with metadata log entry ID ${latest.id}") flintClient.createIndex(indexName, metadata.copy(latestId = Some(latest.id))) }) + logInfo("Create index complete") } catch { case e: Exception => logError("Failed to create Flint index", e) @@ -142,14 +143,17 @@ class FlintSpark(val spark: SparkSession) extends Logging { flintClient .startTransaction(indexName, dataSourceName) .initialLog(latest => latest.state == ACTIVE) - .transientLog(latest => latest.copy(state = REFRESHING)) + .transientLog(latest => + latest.copy(state = REFRESHING, createTime = System.currentTimeMillis())) .finalLog(latest => { // Change state to active if full, otherwise update index state regularly if (mode == FULL) { + logInfo("Updating index state to active") latest.copy(state = ACTIVE) } else { // Schedule regular update and return log entry as refreshing state - scheduleIndexStateUpdate(indexName) + logInfo("Scheduling index state monitor") + flintIndexMonitor.startMonitor(indexName) latest } }) @@ -219,6 +223,7 @@ class FlintSpark(val spark: SparkSession) extends Logging { .finalLog(latest => latest.copy(state = DELETED)) .commit(_ => { // TODO: share same transaction for now + flintIndexMonitor.stopMonitor(indexName) stopRefreshingJob(indexName) flintClient.deleteIndex(indexName) true @@ -229,6 +234,42 @@ class FlintSpark(val spark: SparkSession) extends Logging { throw new IllegalStateException("Failed to delete Flint index") } } else { + logInfo("Flint index to be deleted doesn't exist") + false + } + } + + /** + * Recover index job. + * + * @param indexName + * index name + */ + def recoverIndex(indexName: String): Boolean = { + logInfo(s"Recovering Flint index $indexName") + val index = describeIndex(indexName) + if (index.exists(_.options.autoRefresh())) { + try { + flintClient + .startTransaction(indexName, dataSourceName) + .initialLog(latest => Set(ACTIVE, REFRESHING, FAILED).contains(latest.state)) + .transientLog(latest => + latest.copy(state = RECOVERING, createTime = System.currentTimeMillis())) + .finalLog(latest => { + flintIndexMonitor.startMonitor(indexName) + latest.copy(state = REFRESHING) + }) + .commit(_ => doRefreshIndex(index.get, indexName, INCREMENTAL)) + + logInfo("Recovery complete") + true + } catch { + case e: Exception => + logError("Failed to recover Flint index", e) + throw new IllegalStateException("Failed to recover Flint index") + } + } else { + logInfo("Index to be recovered either doesn't exist or not auto refreshed") false } } @@ -248,32 +289,12 @@ class FlintSpark(val spark: SparkSession) extends Logging { private def isIncrementalRefreshing(indexName: String): Boolean = spark.streams.active.exists(_.name == indexName) - private def scheduleIndexStateUpdate(indexName: String): Unit = { - executor.scheduleAtFixedRate( - () => { - logInfo("Scheduler triggers index log entry update") - try { - flintClient - .startTransaction(indexName, dataSourceName) - .initialLog(latest => latest.state == REFRESHING) - .finalLog(latest => latest) // timestamp will update automatically - .commit(latest => logInfo("Updating log entry to " + latest)) - } catch { - case e: Exception => - logError("Failed to update index log entry", e) - throw new IllegalStateException("Failed to update index log entry") - } - }, - 15, // Delay to ensure final logging is complete first, otherwise version conflicts - 60, // TODO: make interval configurable - TimeUnit.SECONDS) - } - // TODO: move to separate class private def doRefreshIndex( index: FlintSparkIndex, indexName: String, mode: RefreshMode): Option[String] = { + logInfo(s"Refreshing index $indexName in $mode mode") val options = index.options val tableName = index.metadata().source @@ -288,17 +309,19 @@ class FlintSpark(val spark: SparkSession) extends Logging { .save(indexName) } - mode match { + val jobId = mode match { case FULL if isIncrementalRefreshing(indexName) => throw new IllegalStateException( s"Index $indexName is incremental refreshing and cannot be manual refreshed") case FULL => + logInfo("Start refreshing index in batch style") batchRefresh() None // Flint index has specialized logic and capability for incremental refresh case INCREMENTAL if index.isInstanceOf[StreamingRefresh] => + logInfo("Start refreshing index in streaming style") val job = index .asInstanceOf[StreamingRefresh] @@ -313,6 +336,7 @@ class FlintSpark(val spark: SparkSession) extends Logging { // Otherwise, fall back to foreachBatch + batch refresh case INCREMENTAL => + logInfo("Start refreshing index in foreach streaming style") val job = spark.readStream .options(options.extraSourceOptions(tableName)) .table(tableName) @@ -325,6 +349,9 @@ class FlintSpark(val spark: SparkSession) extends Logging { .start() Some(job.id.toString) } + + logInfo("Refresh index complete") + jobId } private def stopRefreshingJob(indexName: String): Unit = { diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala index 9dcd43843..779e2eac6 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala @@ -115,8 +115,13 @@ object FlintSparkIndex { * @return * Flint index name */ - def flintIndexNamePrefix(fullTableName: String): String = - s"flint_${fullTableName.replace(".", "_")}_" + def flintIndexNamePrefix(fullTableName: String): String = { + require(fullTableName.split('.').length >= 3, s"Table name $fullTableName is not qualified") + + // Keep all parts since the third as it is + val parts = fullTableName.split('.') + s"flint_${parts(0)}_${parts(1)}_${parts.drop(2).mkString(".")}" + } /** * Populate environment variables to persist in Flint metadata. diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala new file mode 100644 index 000000000..28e46cb29 --- /dev/null +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala @@ -0,0 +1,122 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark + +import java.util.concurrent.{Executors, ScheduledExecutorService, ScheduledFuture, TimeUnit} + +import scala.collection.concurrent.{Map, TrieMap} +import scala.sys.addShutdownHook + +import org.opensearch.flint.core.FlintClient +import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.{FAILED, REFRESHING} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession + +/** + * Flint Spark index state monitor. + * + * @param spark + * Spark session + * @param flintClient + * Flint client + * @param dataSourceName + * data source name + */ +class FlintSparkIndexMonitor( + spark: SparkSession, + flintClient: FlintClient, + dataSourceName: String) + extends Logging { + + /** + * Start monitoring task on the given Flint index. + * + * @param indexName + * Flint index name + */ + def startMonitor(indexName: String): Unit = { + val task = FlintSparkIndexMonitor.executor.scheduleWithFixedDelay( + () => { + logInfo(s"Scheduler trigger index monitor task for $indexName") + try { + if (isStreamingJobActive(indexName)) { + logInfo("Streaming job is still active") + flintClient + .startTransaction(indexName, dataSourceName) + .initialLog(latest => latest.state == REFRESHING) + .finalLog(latest => latest) // timestamp will update automatically + .commit(_ => {}) + } else { + logError("Streaming job is not active. Cancelling monitor task") + flintClient + .startTransaction(indexName, dataSourceName) + .initialLog(_ => true) + .finalLog(latest => latest.copy(state = FAILED)) + .commit(_ => {}) + + stopMonitor(indexName) + logInfo("Index monitor task is cancelled") + } + } catch { + case e: Exception => + logError("Failed to update index log entry", e) + throw new IllegalStateException("Failed to update index log entry") + } + }, + 15, // Delay to ensure final logging is complete first, otherwise version conflicts + 60, // TODO: make interval configurable + TimeUnit.SECONDS) + + FlintSparkIndexMonitor.indexMonitorTracker.put(indexName, task) + } + + /** + * Cancel scheduled task on the given Flint index. + * + * @param indexName + * Flint index name + */ + def stopMonitor(indexName: String): Unit = { + logInfo(s"Cancelling scheduled task for index $indexName") + val task = FlintSparkIndexMonitor.indexMonitorTracker.remove(indexName) + if (task.isDefined) { + task.get.cancel(true) + } else { + logInfo(s"Cannot find scheduled task") + } + } + + private def isStreamingJobActive(indexName: String): Boolean = + spark.streams.active.exists(_.name == indexName) +} + +object FlintSparkIndexMonitor extends Logging { + + /** + * Thread-safe ExecutorService globally shared by all FlintSpark instance and will be shutdown + * in Spark application upon exit. Non-final variable for test convenience. + */ + var executor: ScheduledExecutorService = Executors.newScheduledThreadPool(1) + + /** + * Tracker that stores task future handle which is required to cancel the task in future. + */ + val indexMonitorTracker: Map[String, ScheduledFuture[_]] = + new TrieMap[String, ScheduledFuture[_]]() + + /* + * Register shutdown hook to SparkContext with default priority (higher than SparkContext.close itself) + */ + addShutdownHook(() => { + logInfo("Shutdown scheduled executor service") + try { + executor.shutdownNow() + } catch { + case e: Exception => logWarning("Failed to shutdown scheduled executor service", e) + } + }) +} diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala index 3dcc6a13f..9cd781dcb 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala @@ -113,7 +113,7 @@ object FlintSparkCoveringIndex { tableName.split("\\.").length >= 3, "Qualified table name catalog.database.table is required") - flintIndexNamePrefix(tableName) + indexName + COVERING_INDEX_SUFFIX + flintIndexNamePrefix(tableName) + "_" + indexName + COVERING_INDEX_SUFFIX } /** Builder class for covering index build */ diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala index 019cc7aa5..44a20e487 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala @@ -12,7 +12,7 @@ import scala.collection.convert.ImplicitConversions.`map AsScala` import org.opensearch.flint.core.metadata.FlintMetadata import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex, FlintSparkIndexBuilder, FlintSparkIndexOptions} -import org.opensearch.flint.spark.FlintSparkIndex.{generateSchemaJSON, metadataBuilder, StreamingRefresh} +import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, generateSchemaJSON, metadataBuilder, StreamingRefresh} import org.opensearch.flint.spark.FlintSparkIndexOptions.empty import org.opensearch.flint.spark.function.TumbleFunction import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.{getFlintIndexName, MV_INDEX_TYPE} @@ -158,7 +158,7 @@ object FlintSparkMaterializedView { mvName.split("\\.").length >= 3, "Qualified materialized view name catalog.database.mv is required") - s"flint_${mvName.replace(".", "_")}" + flintIndexNamePrefix(mvName) } /** Builder class for MV build */ diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala index eb2075b63..d83af5df5 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala @@ -112,7 +112,7 @@ object FlintSparkSkippingIndex { tableName.split("\\.").length >= 3, "Qualified table name catalog.database.table is required") - flintIndexNamePrefix(tableName) + SKIPPING_INDEX_SUFFIX + flintIndexNamePrefix(tableName) + "_" + SKIPPING_INDEX_SUFFIX } /** Builder class for skipping index build */ diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala index 606cb88eb..011bb37fe 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala @@ -9,6 +9,7 @@ import org.antlr.v4.runtime.ParserRuleContext import org.antlr.v4.runtime.tree.{ParseTree, RuleNode} import org.opensearch.flint.spark.FlintSpark import org.opensearch.flint.spark.sql.covering.FlintSparkCoveringIndexAstBuilder +import org.opensearch.flint.spark.sql.job.FlintSparkIndexJobAstBuilder import org.opensearch.flint.spark.sql.mv.FlintSparkMaterializedViewAstBuilder import org.opensearch.flint.spark.sql.skipping.FlintSparkSkippingIndexAstBuilder @@ -25,6 +26,7 @@ class FlintSparkSqlAstBuilder with FlintSparkSkippingIndexAstBuilder with FlintSparkCoveringIndexAstBuilder with FlintSparkMaterializedViewAstBuilder + with FlintSparkIndexJobAstBuilder with SparkSqlAstBuilder { override def visit(tree: ParseTree): LogicalPlan = { diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/job/FlintSparkIndexJobAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/job/FlintSparkIndexJobAstBuilder.scala new file mode 100644 index 000000000..87dd429e5 --- /dev/null +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/job/FlintSparkIndexJobAstBuilder.scala @@ -0,0 +1,26 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.sql.job + +import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder} +import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.RecoverIndexJobStatementContext + +import org.apache.spark.sql.catalyst.plans.logical.Command + +/** + * Flint Spark AST builder that builds Spark command for Flint index job management statement. + */ +trait FlintSparkIndexJobAstBuilder extends FlintSparkSqlExtensionsVisitor[AnyRef] { + self: SparkSqlAstBuilder => + + override def visitRecoverIndexJobStatement(ctx: RecoverIndexJobStatementContext): Command = { + FlintSparkSqlCommand() { flint => + val jobId = ctx.identifier().getText + flint.recoverIndex(jobId) + Seq.empty + } + } +} diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala index 266a10c9f..1a990b5b0 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala @@ -5,8 +5,10 @@ package org.opensearch.flint.spark.sql.mv +import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable` + import org.antlr.v4.runtime.tree.RuleNode -import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex} +import org.opensearch.flint.spark.FlintSpark import org.opensearch.flint.spark.FlintSpark.RefreshMode import org.opensearch.flint.spark.mv.FlintSparkMaterializedView import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder} @@ -65,12 +67,16 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito AttributeReference("materialized_view_name", StringType, nullable = false)()) FlintSparkSqlCommand(outputSchema) { flint => - val catalogDbName = ctx.catalogDb.getText - val indexNamePattern = FlintSparkIndex.flintIndexNamePrefix(catalogDbName) + "*" + val catalogDbName = + ctx.catalogDb.parts + .map(part => part.getText) + .mkString("_") + val indexNamePattern = s"flint_${catalogDbName}_*" flint .describeIndexes(indexNamePattern) .collect { case mv: FlintSparkMaterializedView => - Row(mv.mvName) + // MV name must be qualified when metadata created + Row(mv.mvName.split('.').drop(2).mkString(".")) } } } diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala index f1f10fe26..ba2868caa 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala @@ -24,6 +24,12 @@ class FlintSparkCoveringIndexSuite extends FlintSuite with Matchers { index.name() shouldBe "flint_spark_catalog_default_test_ci_index" } + test("get covering index name on table and index name with dots") { + val testTableDots = "spark_catalog.default.test.2023.10" + val index = new FlintSparkCoveringIndex("ci.01", testTableDots, Map("name" -> "string")) + index.name() shouldBe "flint_spark_catalog_default_test.2023.10_ci.01_index" + } + test("should fail if get index name without full table name") { val index = new FlintSparkCoveringIndex("ci", "test", Map("name" -> "string")) assertThrows[IllegalArgumentException] { diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala index cb32e74d3..b7746d44a 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala @@ -16,7 +16,7 @@ import org.scalatestplus.mockito.MockitoSugar.mock import org.apache.spark.FlintSuite import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation -import org.apache.spark.sql.catalyst.dsl.expressions.{count, intToLiteral, stringToLiteral, DslAttr, DslExpression, StringToAttributeConversionHelper} +import org.apache.spark.sql.catalyst.dsl.expressions.{intToLiteral, stringToLiteral, DslAttr, DslExpression, StringToAttributeConversionHelper} import org.apache.spark.sql.catalyst.dsl.plans.DslLogicalPlan import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalPlan} @@ -36,11 +36,17 @@ class FlintSparkMaterializedViewSuite extends FlintSuite { val testMvName = "spark_catalog.default.mv" val testQuery = "SELECT 1" - test("get name") { + test("get mv name") { val mv = FlintSparkMaterializedView(testMvName, testQuery, Map.empty) mv.name() shouldBe "flint_spark_catalog_default_mv" } + test("get mv name with dots") { + val testMvNameDots = "spark_catalog.default.mv.2023.10" + val mv = FlintSparkMaterializedView(testMvNameDots, testQuery, Map.empty) + mv.name() shouldBe "flint_spark_catalog_default_mv.2023.10" + } + test("should fail if get name with unqualified MV name") { the[IllegalArgumentException] thrownBy FlintSparkMaterializedView("mv", testQuery, Map.empty).name() diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala index d52c43842..491b7811a 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala @@ -30,6 +30,12 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { index.name() shouldBe "flint_spark_catalog_default_test_skipping_index" } + test("get skipping index name on table name with dots") { + val testTableDots = "spark_catalog.default.test.2023.10" + val index = new FlintSparkSkippingIndex(testTableDots, Seq(mock[FlintSparkSkippingStrategy])) + index.name() shouldBe "flint_spark_catalog_default_test.2023.10_skipping_index" + } + test("get index metadata") { val indexCol = mock[FlintSparkSkippingStrategy] when(indexCol.kind).thenReturn(SkippingKind.PARTITION) diff --git a/integ-test/src/test/scala/org/opensearch/flint/core/FlintTransactionITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/core/FlintTransactionITSuite.scala index a6a1dd889..a8b5a1fa2 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/core/FlintTransactionITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/core/FlintTransactionITSuite.scala @@ -27,6 +27,56 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { flintClient = new FlintOpenSearchClient(new FlintOptions(openSearchOptions.asJava)) } + test("empty metadata log entry content") { + flintClient + .startTransaction(testFlintIndex, testDataSourceName) + .initialLog(latest => { + latest.id shouldBe testLatestId + latest.state shouldBe EMPTY + latest.createTime shouldBe 0L + latest.dataSource shouldBe testDataSourceName + latest.error shouldBe "" + true + }) + .finalLog(latest => latest) + .commit(_ => {}) + } + + test("should preserve original values when transition") { + val testCreateTime = 1234567890123L + createLatestLogEntry( + FlintMetadataLogEntry( + id = testLatestId, + seqNo = UNASSIGNED_SEQ_NO, + primaryTerm = UNASSIGNED_PRIMARY_TERM, + createTime = testCreateTime, + state = ACTIVE, + dataSource = testDataSourceName, + error = "")) + + flintClient + .startTransaction(testFlintIndex, testDataSourceName) + .initialLog(latest => { + latest.id shouldBe testLatestId + latest.createTime shouldBe testCreateTime + latest.dataSource shouldBe testDataSourceName + latest.error shouldBe "" + true + }) + .transientLog(latest => latest.copy(state = DELETING)) + .finalLog(latest => latest.copy(state = DELETED)) + .commit(latest => { + latest.id shouldBe testLatestId + latest.createTime shouldBe testCreateTime + latest.dataSource shouldBe testDataSourceName + latest.error shouldBe "" + }) + + latestLogEntry(testLatestId) should (contain("latestId" -> testLatestId) and + contain("dataSourceName" -> testDataSourceName) and + contain("error" -> "")) + } + test("should transit from initial to final log if initial log is empty") { flintClient .startTransaction(testFlintIndex, testDataSourceName) @@ -59,8 +109,9 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { id = testLatestId, seqNo = UNASSIGNED_SEQ_NO, primaryTerm = UNASSIGNED_PRIMARY_TERM, + createTime = 1234567890123L, state = ACTIVE, - dataSource = "mys3", + dataSource = testDataSourceName, error = "")) flintClient @@ -81,10 +132,13 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { flintClient .startTransaction(testFlintIndex, testDataSourceName) .initialLog(_ => false) - .transientLog(latest => latest) + .transientLog(latest => latest.copy(state = ACTIVE)) .finalLog(latest => latest) .commit(_ => {}) } + + // Initial empty log should not be changed + latestLogEntry(testLatestId) should contain("state" -> "empty") } test("should fail if initial log entry updated by others when updating transient log entry") { @@ -119,4 +173,58 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { }) } } + + test("should rollback to initial log if transaction operation failed") { + // Use create index scenario in this test case + the[IllegalStateException] thrownBy { + flintClient + .startTransaction(testFlintIndex, testDataSourceName) + .initialLog(_ => true) + .transientLog(latest => latest.copy(state = CREATING)) + .finalLog(latest => latest.copy(state = ACTIVE)) + .commit(_ => throw new RuntimeException("Mock operation error")) + } + + // Should rollback to initial empty log + latestLogEntry(testLatestId) should contain("state" -> "empty") + } + + test("should rollback to initial log if updating final log failed") { + // Use refresh index scenario in this test case + createLatestLogEntry( + FlintMetadataLogEntry( + id = testLatestId, + seqNo = UNASSIGNED_SEQ_NO, + primaryTerm = UNASSIGNED_PRIMARY_TERM, + createTime = 1234567890123L, + state = ACTIVE, + dataSource = testDataSourceName, + error = "")) + + the[IllegalStateException] thrownBy { + flintClient + .startTransaction(testFlintIndex, testDataSourceName) + .initialLog(_ => true) + .transientLog(latest => latest.copy(state = REFRESHING)) + .finalLog(_ => throw new RuntimeException("Mock final log error")) + .commit(_ => {}) + } + + // Should rollback to initial active log + latestLogEntry(testLatestId) should contain("state" -> "active") + } + + test("should not necessarily rollback if transaction operation failed but no transient action") { + // Use create index scenario in this test case + the[IllegalStateException] thrownBy { + flintClient + .startTransaction(testFlintIndex, testDataSourceName) + .initialLog(_ => true) + .finalLog(latest => latest.copy(state = ACTIVE)) + .commit(_ => throw new RuntimeException("Mock operation error")) + } + + // Should rollback to initial empty log + latestLogEntry(testLatestId) should contain("state" -> "empty") + } } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobITSuite.scala new file mode 100644 index 000000000..365aab83d --- /dev/null +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobITSuite.scala @@ -0,0 +1,107 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark + +import java.util.Base64 + +import scala.collection.JavaConverters.mapAsJavaMapConverter + +import org.opensearch.flint.OpenSearchTransactionSuite +import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry +import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState._ +import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName +import org.opensearch.index.seqno.SequenceNumbers.{UNASSIGNED_PRIMARY_TERM, UNASSIGNED_SEQ_NO} +import org.scalatest.matchers.should.Matchers + +class FlintSparkIndexJobITSuite extends OpenSearchTransactionSuite with Matchers { + + /** Test table and index name */ + private val testTable = "spark_catalog.default.index_job_test" + private val testIndex = getSkippingIndexName(testTable) + private val latestId = Base64.getEncoder.encodeToString(testIndex.getBytes) + + override def beforeAll(): Unit = { + super.beforeAll() + createPartitionedTable(testTable) + } + + override def afterEach(): Unit = { + super.afterEach() // must clean up metadata log first and then delete + flint.deleteIndex(testIndex) + } + + test("recover should exit if index doesn't exist") { + flint.recoverIndex("non_exist_index") shouldBe false + } + + test("recover should exit if index is not auto refreshed") { + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year") + .create() + flint.recoverIndex(testIndex) shouldBe false + } + + test("recover should succeed if index exists and is auto refreshed") { + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year") + .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true"))) + .create() + + flint.recoverIndex(testIndex) shouldBe true + spark.streams.active.exists(_.name == testIndex) + latestLogEntry(latestId) should contain("state" -> "refreshing") + } + + Seq(EMPTY, CREATING, DELETING, DELETED, RECOVERING, UNKNOWN).foreach { state => + test(s"recover should fail if index is in $state state") { + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year") + .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true"))) + .create() + + updateLatestLogEntry( + new FlintMetadataLogEntry( + latestId, + UNASSIGNED_SEQ_NO, + UNASSIGNED_PRIMARY_TERM, + latestLogEntry(latestId).asJava), + state) + + assertThrows[IllegalStateException] { + flint.recoverIndex(testIndex) shouldBe true + } + } + } + + Seq(ACTIVE, REFRESHING, FAILED).foreach { state => + test(s"recover should succeed if index is in $state state") { + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year") + .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true"))) + .create() + + updateLatestLogEntry( + new FlintMetadataLogEntry( + latestId, + UNASSIGNED_SEQ_NO, + UNASSIGNED_PRIMARY_TERM, + latestLogEntry(latestId).asJava), + state) + + flint.recoverIndex(testIndex) shouldBe true + spark.streams.active.exists(_.name == testIndex) + latestLogEntry(latestId) should contain("state" -> "refreshing") + } + } +} diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobSqlITSuite.scala new file mode 100644 index 000000000..ddbfeeb16 --- /dev/null +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobSqlITSuite.scala @@ -0,0 +1,159 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark + +import java.io.File + +import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex +import org.opensearch.flint.spark.mv.FlintSparkMaterializedView +import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex +import org.scalatest.matchers.should.Matchers + +import org.apache.spark.sql.Row + +/** + * This suite doesn't enable transaction to avoid side effect of scheduled update task. + */ +class FlintSparkIndexJobSqlITSuite extends FlintSparkSuite with Matchers { + + private val testTable = "spark_catalog.default.index_job_test" + private val testSkippingIndex = FlintSparkSkippingIndex.getSkippingIndexName(testTable) + + /** Covering index names */ + private val testIndex = "test_ci" + private val testCoveringIndex = FlintSparkCoveringIndex.getFlintIndexName(testIndex, testTable) + + /** Materialized view names and query */ + private val testMv = "spark_catalog.default.mv_test" + private val testMvQuery = s"SELECT name, age FROM $testTable" + private val testMvIndex = FlintSparkMaterializedView.getFlintIndexName(testMv) + + test("recover skipping index refresh job") { + withFlintIndex(testSkippingIndex) { assertion => + assertion + .run { checkpointDir => + s""" CREATE SKIPPING INDEX ON $testTable + | (name VALUE_SET) + | WITH ( + | auto_refresh = true, + | checkpoint_location = '${checkpointDir.getAbsolutePath}' + | ) + |""".stripMargin + } + .assertIndexData(indexData => indexData should have size 5) + .stopStreamingJob() + .run(s""" + | INSERT INTO $testTable VALUES + | (TIMESTAMP '2023-10-01 05:00:00', 'F', 35, 'Vancouver') + |""".stripMargin) + .run(s"RECOVER INDEX JOB $testSkippingIndex") + .assertIndexData(indexData => indexData should have size 6) + .stopStreamingJob() + .run(s""" + | INSERT INTO $testTable VALUES + | (TIMESTAMP '2023-10-01 06:00:00', 'G', 40, 'Vancouver') + |""".stripMargin) + .run(s"RECOVER INDEX JOB `$testSkippingIndex`") // test backtick name + .assertIndexData(indexData => indexData should have size 7) + + } + } + + test("recover covering index refresh job") { + withFlintIndex(testCoveringIndex) { assertion => + assertion + .run { checkpointDir => + s""" CREATE INDEX $testIndex ON $testTable + | (time, name) + | WITH ( + | auto_refresh = true, + | checkpoint_location = '${checkpointDir.getAbsolutePath}' + | ) + |""".stripMargin + } + .assertIndexData(indexData => indexData should have size 5) + .stopStreamingJob() + .run(s""" + | INSERT INTO $testTable VALUES + | (TIMESTAMP '2023-10-01 05:00:00', 'F', 35, 'Vancouver') + |""".stripMargin) + .run(s"RECOVER INDEX JOB $testCoveringIndex") + .assertIndexData(indexData => indexData should have size 6) + } + } + + test("recover materialized view refresh job") { + withFlintIndex(testMvIndex) { assertion => + assertion + .run { checkpointDir => + s""" CREATE MATERIALIZED VIEW $testMv + | AS + | $testMvQuery + | WITH ( + | auto_refresh = true, + | checkpoint_location = '${checkpointDir.getAbsolutePath}' + | ) + |""".stripMargin + } + .assertIndexData(indexData => indexData should have size 5) + .stopStreamingJob() + .run(s""" + | INSERT INTO $testTable VALUES + | (TIMESTAMP '2023-10-01 05:00:00', 'F', 35, 'Vancouver') + |""".stripMargin) + .run(s"RECOVER INDEX JOB $testMvIndex") + .assertIndexData(indexData => indexData should have size 6) + } + } + + private def withFlintIndex(flintIndexName: String)(test: AssertionHelper => Unit): Unit = { + withTable(testTable) { + createTimeSeriesTable(testTable) + + withTempDir { checkpointDir => + try { + test(new AssertionHelper(flintIndexName, checkpointDir)) + } finally { + flint.deleteIndex(flintIndexName) + } + } + } + } + + /** + * Recover test assertion helper that de-duplicates test code. + */ + private class AssertionHelper(flintIndexName: String, checkpointDir: File) { + + def run(createIndex: File => String): AssertionHelper = { + sql(createIndex(checkpointDir)) + this + } + + def run(sqlText: String): AssertionHelper = { + sql(sqlText) + this + } + + def assertIndexData(assertion: Array[Row] => Unit): AssertionHelper = { + awaitStreamingComplete(findJobId(flintIndexName)) + assertion(flint.queryIndex(flintIndexName).collect()) + this + } + + def stopStreamingJob(): AssertionHelper = { + spark.streams.get(findJobId(flintIndexName)).stop() + this + } + + private def findJobId(indexName: String): String = { + val job = spark.streams.active.find(_.name == indexName) + job + .map(_.id.toString) + .getOrElse(throw new RuntimeException(s"Streaming job not found for index $indexName")) + } + } +} diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala index 5d6258860..ce77a510c 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala @@ -184,13 +184,13 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { flint.materializedView().name("spark_catalog.default.mv1").query(testQuery).create() checkAnswer( sql(s"SHOW MATERIALIZED VIEW IN spark_catalog"), - Seq(Row("spark_catalog.default.mv1"))) + Seq(Row("mv1"))) // Show in catalog.database flint.materializedView().name("spark_catalog.default.mv2").query(testQuery).create() checkAnswer( sql(s"SHOW MATERIALIZED VIEW IN spark_catalog.default"), - Seq(Row("spark_catalog.default.mv1"), Row("spark_catalog.default.mv2"))) + Seq(Row("mv1"), Row("mv2"))) checkAnswer(sql(s"SHOW MATERIALIZED VIEW IN spark_catalog.other"), Seq.empty) } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala index 77f39b776..f623db2a9 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala @@ -5,7 +5,15 @@ package org.opensearch.flint.spark +import java.util.concurrent.{ScheduledExecutorService, ScheduledFuture} + +import scala.concurrent.duration.TimeUnit + +import org.mockito.ArgumentMatchers.any +import org.mockito.Mockito.when +import org.mockito.invocation.InvocationOnMock import org.opensearch.flint.OpenSearchSuite +import org.scalatestplus.mockito.MockitoSugar.mock import org.apache.spark.FlintSuite import org.apache.spark.sql.QueryTest @@ -29,6 +37,13 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit // Disable mandatory checkpoint for test convenience setFlintSparkConf(CHECKPOINT_MANDATORY, "false") + + // Replace executor to avoid impact on IT. + // TODO: Currently no IT test scheduler so no need to restore it back. + val mockExecutor = mock[ScheduledExecutorService] + when(mockExecutor.scheduleWithFixedDelay(any[Runnable], any[Long], any[Long], any[TimeUnit])) + .thenAnswer((_: InvocationOnMock) => mock[ScheduledFuture[_]]) + FlintSparkIndexMonitor.executor = mockExecutor } protected def awaitStreamingComplete(jobId: String): Unit = { diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala index 5376617dd..294449a48 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala @@ -10,9 +10,12 @@ import java.util.Base64 import org.json4s.{Formats, NoTypeHints} import org.json4s.native.JsonMethods.parse import org.json4s.native.Serialization +import org.opensearch.action.get.GetRequest import org.opensearch.client.RequestOptions import org.opensearch.client.indices.GetIndexRequest import org.opensearch.flint.OpenSearchTransactionSuite +import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry +import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.DELETED import org.opensearch.flint.spark.FlintSpark.RefreshMode.{FULL, INCREMENTAL} import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName import org.scalatest.matchers.should.Matchers @@ -41,7 +44,10 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match .addPartitions("year", "month") .create() - latestLogEntry(testLatestId) should contain("state" -> "active") + latestLogEntry(testLatestId) should (contain("latestId" -> testLatestId) + and contain("state" -> "active") + and contain("jobStartTime" -> 0) + and contain("dataSourceName" -> testDataSourceName)) implicit val formats: Formats = Serialization.formats(NoTypeHints) val mapping = @@ -63,7 +69,9 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match .create() flint.refreshIndex(testFlintIndex, FULL) - latestLogEntry(testLatestId) should contain("state" -> "active") + val latest = latestLogEntry(testLatestId) + latest should contain("state" -> "active") + latest("jobStartTime").asInstanceOf[Number].longValue() should be > 0L } test("incremental refresh index") { @@ -71,9 +79,23 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match .skippingIndex() .onTable(testTable) .addPartitions("year", "month") + .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true"))) .create() flint.refreshIndex(testFlintIndex, INCREMENTAL) - latestLogEntry(testLatestId) should contain("state" -> "refreshing") + + // Job start time should be assigned + var latest = latestLogEntry(testLatestId) + latest should contain("state" -> "refreshing") + val prevStartTime = latest("jobStartTime").asInstanceOf[Number].longValue() + prevStartTime should be > 0L + + // Restart streaming job + spark.streams.active.head.stop() + flint.recoverIndex(testFlintIndex) + + // Make sure job start time is updated + latest = latestLogEntry(testLatestId) + latest("jobStartTime").asInstanceOf[Number].longValue() should be > prevStartTime } test("delete index") { @@ -86,4 +108,56 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match latestLogEntry(testLatestId) should contain("state" -> "deleted") } + + test("should recreate index if logical deleted") { + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .create() + + // Simulate that user deletes index data manually + flint.deleteIndex(testFlintIndex) + latestLogEntry(testLatestId) should contain("state" -> "deleted") + + // Simulate that user recreate the index + flint + .skippingIndex() + .onTable(testTable) + .addValueSet("name") + .create() + } + + test("should not recreate index if index data still exists") { + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .create() + + // Simulate that PPL plugin leaves index data as logical deleted + deleteLogically(testLatestId) + latestLogEntry(testLatestId) should contain("state" -> "deleted") + + // Simulate that user recreate the index but forgot to cleanup index data + the[IllegalStateException] thrownBy { + flint + .skippingIndex() + .onTable(testTable) + .addValueSet("name") + .create() + } should have message s"Flint index $testFlintIndex already exists" + } + + private def deleteLogically(latestId: String): Unit = { + val response = openSearchClient + .get(new GetRequest(testMetaLogIndex, latestId), RequestOptions.DEFAULT) + + val latest = new FlintMetadataLogEntry( + latestId, + response.getSeqNo, + response.getPrimaryTerm, + response.getSourceAsMap) + updateLatestLogEntry(latest, DELETED) + } }