diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java index debf71a11..b9ef05851 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java @@ -5,10 +5,9 @@ package org.opensearch.flint.core; -import java.util.List; +import java.util.Map; import org.opensearch.flint.core.metadata.FlintMetadata; -import org.opensearch.flint.core.metadata.log.OptimisticTransaction; import org.opensearch.flint.core.storage.FlintReader; import org.opensearch.flint.core.storage.FlintWriter; @@ -18,24 +17,6 @@ */ public interface FlintClient { - /** - * Start a new optimistic transaction. - * - * @param indexName index name - * @return transaction handle - */ - OptimisticTransaction startTransaction(String indexName); - - /** - * - * Start a new optimistic transaction. - * - * @param indexName index name - * @param forceInit forceInit create empty translog if not exist. - * @return transaction handle - */ - OptimisticTransaction startTransaction(String indexName, boolean forceInit); - /** * Create a Flint index with the metadata given. * @@ -56,9 +37,10 @@ public interface FlintClient { * Retrieve all metadata for Flint index whose name matches the given pattern. * * @param indexNamePattern index name pattern - * @return all matched index metadata + * @return map where the keys are the matched index names, and the values are + * corresponding index metadata */ - List getAllIndexMetadata(String indexNamePattern); + Map getAllIndexMetadata(String indexNamePattern); /** * Retrieve metadata in a Flint index. diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogService.java b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogService.java new file mode 100644 index 000000000..a356a456f --- /dev/null +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogService.java @@ -0,0 +1,49 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.metadata.log; + +import java.util.Optional; + +/** + * Flint metadata log service provides API for metadata log related operations on a Flint index + * regardless of underlying storage. + */ +public interface FlintMetadataLogService { + + /** + * Start a new optimistic transaction. + * + * @param indexName index name + * @param forceInit force init transaction and create empty metadata log if not exist + * @return transaction handle + */ + OptimisticTransaction startTransaction(String indexName, boolean forceInit); + + /** + * Start a new optimistic transaction. + * + * @param indexName index name + * @return transaction handle + */ + default OptimisticTransaction startTransaction(String indexName) { + return startTransaction(indexName, false); + } + + /** + * Get metadata log for index. + * + * @param indexName index name + * @return optional metadata log + */ + Optional> getIndexMetadataLog(String indexName); + + /** + * Record heartbeat timestamp for index streaming job. + * + * @param indexName index name + */ + void recordHeartbeat(String indexName); +} diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogServiceBuilder.java b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogServiceBuilder.java new file mode 100644 index 000000000..3e2556f57 --- /dev/null +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogServiceBuilder.java @@ -0,0 +1,18 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.metadata.log; + +import org.opensearch.flint.core.FlintOptions; +import org.opensearch.flint.core.storage.FlintOpenSearchMetadataLogService; + +/** + * {@link FlintMetadataLogService} builder. + */ +public class FlintMetadataLogServiceBuilder { + public static FlintMetadataLogService build(FlintOptions options) { + return new FlintOpenSearchMetadataLogService(options); + } +} diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java index 202f3cc7d..36db4a040 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java @@ -10,10 +10,9 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.Objects; -import java.util.Optional; import java.util.Set; import java.util.logging.Logger; import java.util.stream.Collectors; @@ -33,16 +32,12 @@ import org.opensearch.flint.core.FlintOptions; import org.opensearch.flint.core.IRestHighLevelClient; import org.opensearch.flint.core.metadata.FlintMetadata; -import org.opensearch.flint.core.metadata.log.DefaultOptimisticTransaction; -import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry; -import org.opensearch.flint.core.metadata.log.OptimisticTransaction; import org.opensearch.index.query.AbstractQueryBuilder; import org.opensearch.index.query.MatchAllQueryBuilder; import org.opensearch.index.query.QueryBuilder; import org.opensearch.search.SearchModule; import org.opensearch.search.builder.SearchSourceBuilder; import scala.Option; -import scala.Some; /** * Flint client implementation for OpenSearch storage. @@ -67,47 +62,10 @@ public class FlintOpenSearchClient implements FlintClient { private final static Set INVALID_INDEX_NAME_CHARS = Set.of(' ', ',', ':', '"', '+', '/', '\\', '|', '?', '#', '>', '<'); - /** - * Metadata log index name prefix - */ - public final static String META_LOG_NAME_PREFIX = ".query_execution_request"; - private final FlintOptions options; - private final String dataSourceName; - private final String metaLogIndexName; public FlintOpenSearchClient(FlintOptions options) { this.options = options; - this.dataSourceName = options.getDataSourceName(); - this.metaLogIndexName = constructMetaLogIndexName(); - } - - @Override - public OptimisticTransaction startTransaction(String indexName, boolean forceInit) { - LOG.info("Starting transaction on index " + indexName + " and data source " + dataSourceName); - try (IRestHighLevelClient client = createClient()) { - if (client.doesIndexExist(new GetIndexRequest(metaLogIndexName), RequestOptions.DEFAULT)) { - LOG.info("Found metadata log index " + metaLogIndexName); - } else { - if (forceInit) { - createIndex(metaLogIndexName, FlintMetadataLogEntry.QUERY_EXECUTION_REQUEST_MAPPING(), - Some.apply(FlintMetadataLogEntry.QUERY_EXECUTION_REQUEST_SETTINGS())); - } else { - String errorMsg = "Metadata log index not found " + metaLogIndexName; - LOG.warning(errorMsg); - throw new IllegalStateException(errorMsg); - } - } - return new DefaultOptimisticTransaction<>(dataSourceName, - new FlintOpenSearchMetadataLog(options, indexName, metaLogIndexName)); - } catch (IOException e) { - throw new IllegalStateException("Failed to check if index metadata log index exists " + metaLogIndexName, e); - } - } - - @Override - public OptimisticTransaction startTransaction(String indexName) { - return startTransaction(indexName, false); } @Override @@ -143,7 +101,7 @@ public boolean exists(String indexName) { } @Override - public List getAllIndexMetadata(String indexNamePattern) { + public Map getAllIndexMetadata(String indexNamePattern) { LOG.info("Fetching all Flint index metadata for pattern " + indexNamePattern); String osIndexNamePattern = sanitizeIndexName(indexNamePattern); try (IRestHighLevelClient client = createClient()) { @@ -151,11 +109,13 @@ public List getAllIndexMetadata(String indexNamePattern) { GetIndexResponse response = client.getIndex(request, RequestOptions.DEFAULT); return Arrays.stream(response.getIndices()) - .map(index -> constructFlintMetadata( - index, - response.getMappings().get(index).source().toString(), - response.getSettings().get(index).toString())) - .collect(Collectors.toList()); + .collect(Collectors.toMap( + index -> index, + index -> FlintMetadata.apply( + response.getMappings().get(index).source().toString(), + response.getSettings().get(index).toString() + ) + )); } catch (Exception e) { throw new IllegalStateException("Failed to get Flint index metadata for " + osIndexNamePattern, e); } @@ -171,7 +131,7 @@ public FlintMetadata getIndexMetadata(String indexName) { MappingMetadata mapping = response.getMappings().get(osIndexName); Settings settings = response.getSettings().get(osIndexName); - return constructFlintMetadata(indexName, mapping.source().string(), settings.toString()); + return FlintMetadata.apply(mapping.source().string(), settings.toString()); } catch (Exception e) { throw new IllegalStateException("Failed to get Flint index metadata for " + osIndexName, e); } @@ -241,34 +201,6 @@ public IRestHighLevelClient createClient() { return OpenSearchClientUtils.createClient(options); } - /* - * Constructs Flint metadata with latest metadata log entry attached if it's available. - * It relies on FlintOptions to provide data source name. - */ - private FlintMetadata constructFlintMetadata(String indexName, String mapping, String settings) { - String dataSourceName = options.getDataSourceName(); - String metaLogIndexName = dataSourceName.isEmpty() ? META_LOG_NAME_PREFIX - : META_LOG_NAME_PREFIX + "_" + dataSourceName; - Optional latest = Optional.empty(); - - try (IRestHighLevelClient client = createClient()) { - if (client.doesIndexExist(new GetIndexRequest(metaLogIndexName), RequestOptions.DEFAULT)) { - LOG.info("Found metadata log index " + metaLogIndexName); - FlintOpenSearchMetadataLog metadataLog = - new FlintOpenSearchMetadataLog(options, indexName, metaLogIndexName); - latest = metadataLog.getLatest(); - } - } catch (IOException e) { - throw new IllegalStateException("Failed to check if index metadata log index exists " + metaLogIndexName, e); - } - - if (latest.isEmpty()) { - return FlintMetadata.apply(mapping, settings); - } else { - return FlintMetadata.apply(mapping, settings, latest.get()); - } - } - /* * Because OpenSearch requires all lowercase letters in index name, we have to * lowercase all letters in the given Flint index name. @@ -305,8 +237,4 @@ private String sanitizeIndexName(String indexName) { String encoded = percentEncode(indexName); return toLowercase(encoded); } - - private String constructMetaLogIndexName() { - return dataSourceName.isEmpty() ? META_LOG_NAME_PREFIX : META_LOG_NAME_PREFIX + "_" + dataSourceName; - } } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java index 30c711e9a..6aea13436 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java @@ -44,16 +44,16 @@ public class FlintOpenSearchMetadataLog implements FlintMetadataLog getLatest() { LOG.info("Fetching latest log entry with id " + latestId); try (IRestHighLevelClient client = createOpenSearchClient()) { GetResponse response = - client.get(new GetRequest(metaLogIndexName, latestId), RequestOptions.DEFAULT); + client.get(new GetRequest(metadataLogIndexName, latestId), RequestOptions.DEFAULT); if (response.isExists()) { FlintMetadataLogEntry latest = new FlintMetadataLogEntry( @@ -105,7 +105,7 @@ public void purge() { try (IRestHighLevelClient client = createOpenSearchClient()) { DeleteResponse response = client.delete( - new DeleteRequest(metaLogIndexName, latestId), RequestOptions.DEFAULT); + new DeleteRequest(metadataLogIndexName, latestId), RequestOptions.DEFAULT); LOG.info("Purged log entry with result " + response.getResult()); } catch (Exception e) { @@ -129,7 +129,7 @@ private FlintMetadataLogEntry createLogEntry(FlintMetadataLogEntry logEntry) { return writeLogEntry(logEntryWithId, client -> client.index( new IndexRequest() - .index(metaLogIndexName) + .index(metadataLogIndexName) .id(logEntryWithId.id()) .setRefreshPolicy(RefreshPolicy.WAIT_UNTIL) .source(logEntryWithId.toJson(), XContentType.JSON), @@ -140,7 +140,7 @@ private FlintMetadataLogEntry updateLogEntry(FlintMetadataLogEntry logEntry) { LOG.info("Updating log entry " + logEntry); return writeLogEntry(logEntry, client -> client.update( - new UpdateRequest(metaLogIndexName, logEntry.id()) + new UpdateRequest(metadataLogIndexName, logEntry.id()) .doc(logEntry.toJson(), XContentType.JSON) .setRefreshPolicy(RefreshPolicy.WAIT_UNTIL) .setIfSeqNo(logEntry.seqNo()) @@ -173,11 +173,11 @@ private FlintMetadataLogEntry writeLogEntry( } private boolean exists() { - LOG.info("Checking if Flint index exists " + metaLogIndexName); + LOG.info("Checking if Flint index exists " + metadataLogIndexName); try (IRestHighLevelClient client = createOpenSearchClient()) { - return client.doesIndexExist(new GetIndexRequest(metaLogIndexName), RequestOptions.DEFAULT); + return client.doesIndexExist(new GetIndexRequest(metadataLogIndexName), RequestOptions.DEFAULT); } catch (IOException e) { - throw new IllegalStateException("Failed to check if Flint index exists " + metaLogIndexName, e); + throw new IllegalStateException("Failed to check if Flint index exists " + metadataLogIndexName, e); } } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLogService.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLogService.java new file mode 100644 index 000000000..f04a3bc67 --- /dev/null +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLogService.java @@ -0,0 +1,106 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.storage; + +import java.io.IOException; +import java.util.Optional; +import java.util.logging.Logger; +import org.opensearch.client.RequestOptions; +import org.opensearch.client.indices.CreateIndexRequest; +import org.opensearch.client.indices.GetIndexRequest; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.flint.core.FlintOptions; +import org.opensearch.flint.core.IRestHighLevelClient; +import org.opensearch.flint.core.metadata.log.DefaultOptimisticTransaction; +import org.opensearch.flint.core.metadata.log.FlintMetadataLog; +import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry; +import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState$; +import org.opensearch.flint.core.metadata.log.FlintMetadataLogService; +import org.opensearch.flint.core.metadata.log.OptimisticTransaction; + +/** + * Flint metadata log service implementation for OpenSearch storage. + */ +public class FlintOpenSearchMetadataLogService implements FlintMetadataLogService { + + private static final Logger LOG = Logger.getLogger(FlintOpenSearchMetadataLogService.class.getName()); + + public final static String METADATA_LOG_INDEX_NAME_PREFIX = ".query_execution_request"; + + private final FlintOptions options; + private final String dataSourceName; + private final String metadataLogIndexName; + + public FlintOpenSearchMetadataLogService(FlintOptions options) { + this.options = options; + this.dataSourceName = options.getDataSourceName(); + this.metadataLogIndexName = constructMetadataLogIndexName(); + } + + @Override + public OptimisticTransaction startTransaction(String indexName, boolean forceInit) { + LOG.info("Starting transaction on index " + indexName + " and data source " + dataSourceName); + Optional> metadataLog = getIndexMetadataLog(indexName, forceInit); + if (metadataLog.isEmpty()) { + String errorMsg = "Metadata log index not found " + metadataLogIndexName; + throw new IllegalStateException(errorMsg); + } + return new DefaultOptimisticTransaction<>(dataSourceName, metadataLog.get()); + } + + @Override + public Optional> getIndexMetadataLog(String indexName) { + return getIndexMetadataLog(indexName, false); + } + + @Override + public void recordHeartbeat(String indexName) { + startTransaction(indexName) + .initialLog(latest -> latest.state() == IndexState$.MODULE$.REFRESHING()) + .finalLog(latest -> latest) // timestamp will update automatically + .commit(latest -> null); + } + + private Optional> getIndexMetadataLog(String indexName, boolean initIfNotExist) { + LOG.info("Getting metadata log for index " + indexName + " and data source " + dataSourceName); + try (IRestHighLevelClient client = createOpenSearchClient()) { + if (client.doesIndexExist(new GetIndexRequest(metadataLogIndexName), RequestOptions.DEFAULT)) { + LOG.info("Found metadata log index " + metadataLogIndexName); + } else { + if (initIfNotExist) { + initIndexMetadataLog(); + } else { + String errorMsg = "Metadata log index not found " + metadataLogIndexName; + LOG.warning(errorMsg); + return Optional.empty(); + } + } + return Optional.of(new FlintOpenSearchMetadataLog(options, indexName, metadataLogIndexName)); + } catch (IOException e) { + throw new IllegalStateException("Failed to check if index metadata log index exists " + metadataLogIndexName, e); + } + } + + private void initIndexMetadataLog() { + LOG.info("Initializing metadata log index " + metadataLogIndexName); + try (IRestHighLevelClient client = createOpenSearchClient()) { + CreateIndexRequest request = new CreateIndexRequest(metadataLogIndexName); + request.mapping(FlintMetadataLogEntry.QUERY_EXECUTION_REQUEST_MAPPING(), XContentType.JSON); + request.settings(FlintMetadataLogEntry.QUERY_EXECUTION_REQUEST_SETTINGS(), XContentType.JSON); + client.createIndex(request, RequestOptions.DEFAULT); + } catch (Exception e) { + throw new IllegalStateException("Failed to initialize metadata log index " + metadataLogIndexName, e); + } + } + + private String constructMetadataLogIndexName() { + return dataSourceName.isEmpty() ? METADATA_LOG_INDEX_NAME_PREFIX : METADATA_LOG_INDEX_NAME_PREFIX + "_" + dataSourceName; + } + + private IRestHighLevelClient createOpenSearchClient() { + return OpenSearchClientUtils.createClient(options); + } +} diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index ae8a9c064..df7c92636 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -10,6 +10,8 @@ import scala.collection.JavaConverters._ import org.json4s.{Formats, NoTypeHints} import org.json4s.native.Serialization import org.opensearch.flint.core.{FlintClient, FlintClientBuilder} +import org.opensearch.flint.core.metadata.FlintMetadata +import org.opensearch.flint.core.metadata.log.{FlintMetadataLogService, FlintMetadataLogServiceBuilder} import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState._ import org.opensearch.flint.core.metadata.log.OptimisticTransaction.NO_LOG_ENTRY import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN @@ -43,12 +45,15 @@ class FlintSpark(val spark: SparkSession) extends Logging { /** Flint client for low-level index operation */ private val flintClient: FlintClient = FlintClientBuilder.build(flintSparkConf.flintOptions()) + private val flintMetadataLogService: FlintMetadataLogService = + FlintMetadataLogServiceBuilder.build(flintSparkConf.flintOptions()) + /** Required by json4s parse function */ implicit val formats: Formats = Serialization.formats(NoTypeHints) + SkippingKindSerializer /** Flint Spark index monitor */ val flintIndexMonitor: FlintSparkIndexMonitor = - new FlintSparkIndexMonitor(spark, flintClient) + new FlintSparkIndexMonitor(spark, flintMetadataLogService) /** * Create index builder for creating index with fluent API. @@ -98,7 +103,7 @@ class FlintSpark(val spark: SparkSession) extends Logging { } else { val metadata = index.metadata() try { - flintClient + flintMetadataLogService .startTransaction(indexName, true) .initialLog(latest => latest.state == EMPTY || latest.state == DELETED) .transientLog(latest => latest.copy(state = CREATING)) @@ -134,7 +139,7 @@ class FlintSpark(val spark: SparkSession) extends Logging { val indexRefresh = FlintSparkIndexRefresh.create(indexName, index) try { - flintClient + flintMetadataLogService .startTransaction(indexName) .initialLog(latest => latest.state == ACTIVE) .transientLog(latest => @@ -172,6 +177,10 @@ class FlintSpark(val spark: SparkSession) extends Logging { flintClient .getAllIndexMetadata(indexNamePattern) .asScala + .map { case (indexName, metadata) => + attachLatestLogEntry(indexName, metadata) + } + .toList .flatMap(FlintSparkIndexFactory.create) } else { Seq.empty @@ -190,7 +199,8 @@ class FlintSpark(val spark: SparkSession) extends Logging { logInfo(s"Describing index name $indexName") if (flintClient.exists(indexName)) { val metadata = flintClient.getIndexMetadata(indexName) - FlintSparkIndexFactory.create(metadata) + val metadataWithEntry = attachLatestLogEntry(indexName, metadata) + FlintSparkIndexFactory.create(metadataWithEntry) } else { Option.empty } @@ -241,7 +251,7 @@ class FlintSpark(val spark: SparkSession) extends Logging { logInfo(s"Deleting Flint index $indexName") if (flintClient.exists(indexName)) { try { - flintClient + flintMetadataLogService .startTransaction(indexName) .initialLog(latest => latest.state == ACTIVE || latest.state == REFRESHING || latest.state == FAILED) @@ -276,7 +286,7 @@ class FlintSpark(val spark: SparkSession) extends Logging { logInfo(s"Vacuuming Flint index $indexName") if (flintClient.exists(indexName)) { try { - flintClient + flintMetadataLogService .startTransaction(indexName) .initialLog(latest => latest.state == DELETED) .transientLog(latest => latest.copy(state = VACUUMING)) @@ -307,7 +317,7 @@ class FlintSpark(val spark: SparkSession) extends Logging { val index = describeIndex(indexName) if (index.exists(_.options.autoRefresh())) { try { - flintClient + flintMetadataLogService .startTransaction(indexName) .initialLog(latest => Set(ACTIVE, REFRESHING, FAILED).contains(latest.state)) .transientLog(latest => @@ -338,7 +348,7 @@ class FlintSpark(val spark: SparkSession) extends Logging { * interim, but metadata log get deleted by this cleanup process. */ logWarning("Cleaning up metadata log as index data has been deleted") - flintClient + flintMetadataLogService .startTransaction(indexName) .initialLog(_ => true) .finalLog(_ => NO_LOG_ENTRY) @@ -382,6 +392,27 @@ class FlintSpark(val spark: SparkSession) extends Logging { } } + /** + * Attaches latest log entry to metadata if available. + * + * @param indexName + * index name + * @param metadata + * base flint metadata + * @return + * flint metadata with latest log entry attached if available + */ + private def attachLatestLogEntry(indexName: String, metadata: FlintMetadata): FlintMetadata = { + val latest = flintMetadataLogService + .getIndexMetadataLog(indexName) + .flatMap(_.getLatest) + if (latest.isPresent) { + metadata.copy(latestLogEntry = Some(latest.get())) + } else { + metadata + } + } + /** * Validate the index update options are allowed. * @param originalOptions @@ -428,7 +459,7 @@ class FlintSpark(val spark: SparkSession) extends Logging { private def updateIndexAutoToManual(index: FlintSparkIndex): Option[String] = { val indexName = index.name val indexLogEntry = index.latestLogEntry.get - flintClient + flintMetadataLogService .startTransaction(indexName) .initialLog(latest => latest.state == REFRESHING && latest.seqNo == indexLogEntry.seqNo && latest.primaryTerm == indexLogEntry.primaryTerm) @@ -447,7 +478,7 @@ class FlintSpark(val spark: SparkSession) extends Logging { val indexName = index.name val indexLogEntry = index.latestLogEntry.get val indexRefresh = FlintSparkIndexRefresh.create(indexName, index) - flintClient + flintMetadataLogService .startTransaction(indexName) .initialLog(latest => latest.state == ACTIVE && latest.seqNo == indexLogEntry.seqNo && latest.primaryTerm == indexLogEntry.primaryTerm) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala index 2ca527f1e..815dfa71a 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala @@ -16,8 +16,8 @@ import scala.sys.addShutdownHook import dev.failsafe.{Failsafe, RetryPolicy} import dev.failsafe.event.ExecutionAttemptedEvent import dev.failsafe.function.CheckedRunnable -import org.opensearch.flint.core.FlintClient import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.{FAILED, REFRESHING} +import org.opensearch.flint.core.metadata.log.FlintMetadataLogService import org.opensearch.flint.core.metrics.{MetricConstants, MetricsUtil} import org.apache.spark.internal.Logging @@ -30,10 +30,13 @@ import org.apache.spark.sql.flint.newDaemonThreadPoolScheduledExecutor * * @param spark * Spark session - * @param flintClient - * Flint client + * @param flintMetadataLogService + * Flint metadata log service */ -class FlintSparkIndexMonitor(spark: SparkSession, flintClient: FlintClient) extends Logging { +class FlintSparkIndexMonitor( + spark: SparkSession, + flintMetadataLogService: FlintMetadataLogService) + extends Logging { /** Task execution initial delay in seconds */ private val INITIAL_DELAY_SECONDS = FlintSparkConf().monitorInitialDelaySeconds() @@ -153,11 +156,7 @@ class FlintSparkIndexMonitor(spark: SparkSession, flintClient: FlintClient) exte try { if (isStreamingJobActive(indexName)) { logInfo("Streaming job is still active") - flintClient - .startTransaction(indexName) - .initialLog(latest => latest.state == REFRESHING) - .finalLog(latest => latest) // timestamp will update automatically - .commit(_ => {}) + flintMetadataLogService.recordHeartbeat(indexName) } else { logError("Streaming job is not active. Cancelling monitor task") stopMonitor(indexName) @@ -205,7 +204,7 @@ class FlintSparkIndexMonitor(spark: SparkSession, flintClient: FlintClient) exte private def retryUpdateIndexStateToFailed(indexName: String): Unit = { logInfo(s"Updating index state to failed for $indexName") retry { - flintClient + flintMetadataLogService .startTransaction(indexName) .initialLog(latest => latest.state == REFRESHING) .finalLog(latest => latest.copy(state = FAILED)) diff --git a/integ-test/src/test/scala/org/opensearch/flint/OpenSearchTransactionSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/OpenSearchTransactionSuite.scala index 1e2219600..f37bb53f7 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/OpenSearchTransactionSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/OpenSearchTransactionSuite.scala @@ -19,7 +19,7 @@ import org.opensearch.common.xcontent.XContentType import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.{QUERY_EXECUTION_REQUEST_MAPPING, QUERY_EXECUTION_REQUEST_SETTINGS} import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.IndexState -import org.opensearch.flint.core.storage.FlintOpenSearchClient._ +import org.opensearch.flint.core.storage.FlintOpenSearchMetadataLogService.METADATA_LOG_INDEX_NAME_PREFIX import org.opensearch.flint.spark.FlintSparkSuite import org.apache.spark.sql.flint.config.FlintSparkConf.DATA_SOURCE_NAME @@ -31,7 +31,7 @@ import org.apache.spark.sql.flint.config.FlintSparkConf.DATA_SOURCE_NAME trait OpenSearchTransactionSuite extends FlintSparkSuite { val testDataSourceName = "myglue" - lazy val testMetaLogIndex: String = META_LOG_NAME_PREFIX + "_" + testDataSourceName + lazy val testMetaLogIndex: String = METADATA_LOG_INDEX_NAME_PREFIX + "_" + testDataSourceName override def beforeAll(): Unit = { super.beforeAll() diff --git a/integ-test/src/test/scala/org/opensearch/flint/core/FlintMetadataLogITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/core/FlintMetadataLogITSuite.scala new file mode 100644 index 000000000..f8a8c2164 --- /dev/null +++ b/integ-test/src/test/scala/org/opensearch/flint/core/FlintMetadataLogITSuite.scala @@ -0,0 +1,100 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core + +import java.util.Base64 + +import scala.collection.JavaConverters._ + +import org.opensearch.flint.OpenSearchTransactionSuite +import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry +import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState._ +import org.opensearch.flint.core.metadata.log.FlintMetadataLogService +import org.opensearch.flint.core.storage.FlintOpenSearchMetadataLogService +import org.opensearch.index.seqno.SequenceNumbers.{UNASSIGNED_PRIMARY_TERM, UNASSIGNED_SEQ_NO} +import org.scalatest.matchers.should.Matchers + +import org.apache.spark.sql.flint.config.FlintSparkConf.DATA_SOURCE_NAME + +class FlintMetadataLogITSuite extends OpenSearchTransactionSuite with Matchers { + + val testFlintIndex = "flint_test_index" + val testLatestId: String = Base64.getEncoder.encodeToString(testFlintIndex.getBytes) + val testCreateTime = 1234567890123L + val flintMetadataLogEntry = FlintMetadataLogEntry( + id = testLatestId, + seqNo = UNASSIGNED_SEQ_NO, + primaryTerm = UNASSIGNED_PRIMARY_TERM, + createTime = testCreateTime, + state = ACTIVE, + dataSource = testDataSourceName, + error = "") + + var flintMetadataLogService: FlintMetadataLogService = _ + + override def beforeAll(): Unit = { + super.beforeAll() + val options = openSearchOptions + (DATA_SOURCE_NAME.key -> testDataSourceName) + val flintOptions = new FlintOptions(options.asJava) + flintMetadataLogService = new FlintOpenSearchMetadataLogService(flintOptions) + } + + test("should fail if metadata log index doesn't exists") { + val options = openSearchOptions + (DATA_SOURCE_NAME.key -> "non-exist-datasource") + val flintMetadataLogService = + new FlintOpenSearchMetadataLogService(new FlintOptions(options.asJava)) + + the[IllegalStateException] thrownBy { + flintMetadataLogService.startTransaction(testFlintIndex) + } + } + + test("should get index metadata log without log entry") { + val metadataLog = flintMetadataLogService.getIndexMetadataLog(testFlintIndex) + metadataLog.isPresent shouldBe true + metadataLog.get.getLatest shouldBe empty + } + + test("should get index metadata log with log entry") { + createLatestLogEntry(flintMetadataLogEntry) + val metadataLog = flintMetadataLogService.getIndexMetadataLog(testFlintIndex) + metadataLog.isPresent shouldBe true + + val latest = metadataLog.get.getLatest + latest.isPresent shouldBe true + latest.get.id shouldBe testLatestId + latest.get.createTime shouldBe testCreateTime + latest.get.dataSource shouldBe testDataSourceName + latest.get.error shouldBe "" + } + + test("should not get index metadata log if not exist") { + val options = openSearchOptions + (DATA_SOURCE_NAME.key -> "non-exist-datasource") + val flintMetadataLogService = + new FlintOpenSearchMetadataLogService(new FlintOptions(options.asJava)) + val metadataLog = flintMetadataLogService.getIndexMetadataLog(testFlintIndex) + metadataLog.isPresent shouldBe false + } + + test("should update timestamp when record heartbeat") { + val refreshingLogEntry = flintMetadataLogEntry.copy(state = REFRESHING) + createLatestLogEntry(refreshingLogEntry) + val updateTimeBeforeHeartbeat = + latestLogEntry(testLatestId).get("lastUpdateTime").get.asInstanceOf[Long] + flintMetadataLogService.recordHeartbeat(testFlintIndex) + latestLogEntry(testLatestId) + .get("lastUpdateTime") + .get + .asInstanceOf[Long] should be > updateTimeBeforeHeartbeat + } + + test("should fail when record heartbeat if index not refreshing") { + createLatestLogEntry(flintMetadataLogEntry) + the[IllegalStateException] thrownBy { + flintMetadataLogService.recordHeartbeat(testFlintIndex) + } + } +} diff --git a/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala index f2d1a1b60..1373654aa 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala @@ -30,15 +30,6 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M behavior of "Flint OpenSearch client" - it should "throw IllegalStateException if metadata log index doesn't exists" in { - val options = openSearchOptions + (DATA_SOURCE_NAME.key -> "non-exist-datasource") - val flintClient = FlintClientBuilder.build(new FlintOptions(options.asJava)) - - the[IllegalStateException] thrownBy { - flintClient.startTransaction("test") - } - } - it should "create index successfully" in { val indexName = "test" val content = @@ -133,8 +124,8 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M val allMetadata = flintClient.getAllIndexMetadata("flint_*_index") allMetadata should have size 2 - allMetadata.forEach(metadata => metadata.getContent should not be empty) - allMetadata.forEach(metadata => metadata.indexSettings should not be empty) + allMetadata.values.forEach(metadata => metadata.getContent should not be empty) + allMetadata.values.forEach(metadata => metadata.indexSettings should not be empty) } it should "convert index name to all lowercase" in { diff --git a/integ-test/src/test/scala/org/opensearch/flint/core/FlintTransactionITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/core/FlintTransactionITSuite.scala index a34f0c7a5..6da232389 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/core/FlintTransactionITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/core/FlintTransactionITSuite.scala @@ -11,15 +11,13 @@ import scala.collection.JavaConverters.mapAsJavaMapConverter import org.json4s.{Formats, NoTypeHints} import org.json4s.native.{JsonMethods, Serialization} -import org.mockito.Mockito.when import org.opensearch.flint.OpenSearchTransactionSuite -import org.opensearch.flint.core.metadata.FlintMetadata import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState._ -import org.opensearch.flint.core.storage.FlintOpenSearchClient +import org.opensearch.flint.core.metadata.log.FlintMetadataLogService +import org.opensearch.flint.core.storage.FlintOpenSearchMetadataLogService import org.opensearch.index.seqno.SequenceNumbers.{UNASSIGNED_PRIMARY_TERM, UNASSIGNED_SEQ_NO} import org.scalatest.matchers.should.Matchers -import org.scalatestplus.mockito.MockitoSugar.mock import org.apache.spark.sql.flint.config.FlintSparkConf.DATA_SOURCE_NAME @@ -27,16 +25,17 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { val testFlintIndex = "flint_test_index" val testLatestId: String = Base64.getEncoder.encodeToString(testFlintIndex.getBytes) - var flintClient: FlintClient = _ + var flintMetadataLogService: FlintMetadataLogService = _ override def beforeAll(): Unit = { super.beforeAll() val options = openSearchOptions + (DATA_SOURCE_NAME.key -> testDataSourceName) - flintClient = new FlintOpenSearchClient(new FlintOptions(options.asJava)) + flintMetadataLogService = new FlintOpenSearchMetadataLogService( + new FlintOptions(options.asJava)) } test("empty metadata log entry content") { - flintClient + flintMetadataLogService .startTransaction(testFlintIndex) .initialLog(latest => { latest.id shouldBe testLatestId @@ -50,45 +49,6 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { .commit(_ => {}) } - test("get index metadata with latest log entry") { - val testCreateTime = 1234567890123L - val flintMetadataLogEntry = FlintMetadataLogEntry( - id = testLatestId, - seqNo = UNASSIGNED_SEQ_NO, - primaryTerm = UNASSIGNED_PRIMARY_TERM, - createTime = testCreateTime, - state = ACTIVE, - dataSource = testDataSourceName, - error = "") - val metadata = mock[FlintMetadata] - when(metadata.getContent).thenReturn("{}") - when(metadata.indexSettings).thenReturn(None) - when(metadata.latestLogEntry).thenReturn(Some(flintMetadataLogEntry)) - - flintClient.createIndex(testFlintIndex, metadata) - createLatestLogEntry(flintMetadataLogEntry) - - val latest = flintClient.getIndexMetadata(testFlintIndex).latestLogEntry - latest.isDefined shouldBe true - latest.get.id shouldBe testLatestId - latest.get.createTime shouldBe testCreateTime - latest.get.dataSource shouldBe testDataSourceName - latest.get.error shouldBe "" - - deleteTestIndex(testFlintIndex) - } - - test("should get empty metadata log entry") { - val metadata = mock[FlintMetadata] - when(metadata.getContent).thenReturn("{}") - when(metadata.indexSettings).thenReturn(None) - flintClient.createIndex(testFlintIndex, metadata) - - flintClient.getIndexMetadata(testFlintIndex).latestLogEntry shouldBe empty - - deleteTestIndex(testFlintIndex) - } - test("should preserve original values when transition") { val testCreateTime = 1234567890123L createLatestLogEntry( @@ -101,7 +61,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { dataSource = testDataSourceName, error = "")) - flintClient + flintMetadataLogService .startTransaction(testFlintIndex) .initialLog(latest => { latest.id shouldBe testLatestId @@ -125,7 +85,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { } test("should transit from initial to final log if initial log is empty") { - flintClient + flintMetadataLogService .startTransaction(testFlintIndex) .initialLog(latest => { latest.state shouldBe EMPTY @@ -139,7 +99,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { } test("should transit from initial to final log directly if no transient log") { - flintClient + flintMetadataLogService .startTransaction(testFlintIndex) .initialLog(_ => true) .finalLog(latest => latest.copy(state = ACTIVE)) @@ -161,7 +121,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { dataSource = testDataSourceName, error = "")) - flintClient + flintMetadataLogService .startTransaction(testFlintIndex) .initialLog(latest => { latest.state shouldBe ACTIVE @@ -176,7 +136,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { test("should exit if initial log entry doesn't meet precondition") { the[IllegalStateException] thrownBy { - flintClient + flintMetadataLogService .startTransaction(testFlintIndex) .initialLog(_ => false) .transientLog(latest => latest.copy(state = ACTIVE)) @@ -190,7 +150,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { test("should fail if initial log entry updated by others when updating transient log entry") { the[IllegalStateException] thrownBy { - flintClient + flintMetadataLogService .startTransaction(testFlintIndex) .initialLog(_ => true) .transientLog(latest => { @@ -206,7 +166,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { test("should fail if transient log entry updated by others when updating final log entry") { the[IllegalStateException] thrownBy { - flintClient + flintMetadataLogService .startTransaction(testFlintIndex) .initialLog(_ => true) .transientLog(latest => { @@ -224,7 +184,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { test("should rollback to initial log if transaction operation failed") { // Use create index scenario in this test case the[IllegalStateException] thrownBy { - flintClient + flintMetadataLogService .startTransaction(testFlintIndex) .initialLog(_ => true) .transientLog(latest => latest.copy(state = CREATING)) @@ -249,7 +209,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { error = "")) the[IllegalStateException] thrownBy { - flintClient + flintMetadataLogService .startTransaction(testFlintIndex) .initialLog(_ => true) .transientLog(latest => latest.copy(state = REFRESHING)) @@ -265,7 +225,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { "should not necessarily rollback if transaction operation failed but no transient action") { // Use create index scenario in this test case the[IllegalStateException] thrownBy { - flintClient + flintMetadataLogService .startTransaction(testFlintIndex) .initialLog(_ => true) .finalLog(latest => latest.copy(state = ACTIVE)) @@ -278,7 +238,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { test("forceInit translog, even index is deleted before startTransaction") { deleteIndex(testMetaLogIndex) - flintClient + flintMetadataLogService .startTransaction(testFlintIndex, true) .initialLog(latest => { latest.id shouldBe testLatestId @@ -298,7 +258,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { test("should fail if index is deleted before initial operation") { the[IllegalStateException] thrownBy { - flintClient + flintMetadataLogService .startTransaction(testFlintIndex) .initialLog(latest => { deleteIndex(testMetaLogIndex) @@ -312,7 +272,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { test("should fail if index is deleted before transient operation") { the[IllegalStateException] thrownBy { - flintClient + flintMetadataLogService .startTransaction(testFlintIndex) .initialLog(latest => true) .transientLog(latest => { @@ -326,7 +286,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { test("should fail if index is deleted before final operation") { the[IllegalStateException] thrownBy { - flintClient + flintMetadataLogService .startTransaction(testFlintIndex) .initialLog(latest => true) .transientLog(latest => { latest.copy(state = CREATING) })