diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLog.java b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLog.java new file mode 100644 index 000000000..786f4f326 --- /dev/null +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLog.java @@ -0,0 +1,18 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.metadata.log; + +import java.util.Optional; + +/** + * Flint metadata log. + */ +public interface FlintMetadataLog { + + T add(T logEntry); + + Optional getLatest(); +} diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogEntry.scala b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogEntry.scala index bd6cc98d2..826074c96 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogEntry.scala +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogEntry.scala @@ -27,15 +27,15 @@ import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.I */ case class FlintMetadataLogEntry( id: String, - seqNo: Long = -1, - primaryTerm: Long = -1, + seqNo: Long, + primaryTerm: Long, state: IndexState, dataSource: String, // TODO: get from Spark conf error: String) { - def this(docId: String, seqNo: Long, primaryTerm: Long, map: java.util.Map[String, AnyRef]) { + def this(id: String, seqNo: Long, primaryTerm: Long, map: java.util.Map[String, AnyRef]) { this( - docId, + id, seqNo, primaryTerm, IndexState.from(map.get("state").asInstanceOf[String]), diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintOpenSearchMetadataLog.java b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintOpenSearchMetadataLog.java new file mode 100644 index 000000000..aec6b6cb4 --- /dev/null +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintOpenSearchMetadataLog.java @@ -0,0 +1,128 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.metadata.log; + +import java.io.IOException; +import java.util.Base64; +import java.util.Optional; +import java.util.logging.Logger; +import org.opensearch.OpenSearchException; +import org.opensearch.action.get.GetRequest; +import org.opensearch.action.get.GetResponse; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.action.index.IndexResponse; +import org.opensearch.action.support.WriteRequest; +import org.opensearch.action.update.UpdateRequest; +import org.opensearch.action.update.UpdateResponse; +import org.opensearch.client.RequestOptions; +import org.opensearch.client.RestHighLevelClient; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.flint.core.FlintClient; + +/** + * Flint metadata log in OpenSearch store. + */ +public class FlintOpenSearchMetadataLog implements FlintMetadataLog { + + private static final Logger LOG = Logger.getLogger(FlintOpenSearchMetadataLog.class.getName()); + + /** + * Flint client to create Rest OpenSearch client (This will be refactored later) + */ + private final FlintClient flintClient; + + /** + * Reuse query request index as Flint metadata log store + */ + private final String metadataLogIndexName; + + /** + * Doc id for latest log entry (Naming rule is static so no need to query Flint index metadata) + */ + private final String latestId; + + public FlintOpenSearchMetadataLog(FlintClient flintClient, String flintIndexName, String metadataLogIndexName) { + this.flintClient = flintClient; + this.latestId = Base64.getEncoder().encodeToString(flintIndexName.getBytes()); + this.metadataLogIndexName = metadataLogIndexName; + } + + @Override + public FlintMetadataLogEntry add(FlintMetadataLogEntry logEntry) { + // TODO: use single doc for now. this will be always append in future. + FlintMetadataLogEntry latest; + if (logEntry.id().isEmpty()) { + latest = createLogEntry(logEntry); + } else { + latest = updateLogEntry(logEntry); + } + return latest; + } + + @Override + public Optional getLatest() { + try (RestHighLevelClient client = flintClient.createClient()) { + GetResponse response = + client.get(new GetRequest(metadataLogIndexName, latestId), RequestOptions.DEFAULT); + if (response.isExists()) { + return Optional.of( + new FlintMetadataLogEntry( + response.getId(), + response.getSeqNo(), + response.getPrimaryTerm(), + response.getSourceAsMap())); + } else { + return Optional.empty(); + } + } catch (Exception e) { + throw new IllegalStateException("Failed to fetch latest metadata log entry", e); + } + } + + private FlintMetadataLogEntry createLogEntry(FlintMetadataLogEntry logEntry) { + LOG.info("Creating log entry " + logEntry); + try (RestHighLevelClient client = flintClient.createClient()) { + logEntry = logEntry.copy( + latestId, + logEntry.seqNo(), logEntry.primaryTerm(), logEntry.state(), logEntry.dataSource(), logEntry.error()); + + IndexResponse response = client.index( + new IndexRequest() + .index(metadataLogIndexName) + .id(logEntry.id()) + .source(logEntry.toJson(), XContentType.JSON), + RequestOptions.DEFAULT); + + // Update seqNo and primaryTerm in log entry object + return logEntry.copy( + logEntry.id(), response.getSeqNo(), response.getPrimaryTerm(), + logEntry.state(), logEntry.dataSource(), logEntry.error()); + } catch (OpenSearchException | IOException e) { + throw new IllegalStateException("Failed to create initial log entry", e); + } + } + + private FlintMetadataLogEntry updateLogEntry(FlintMetadataLogEntry logEntry) { + LOG.info("Updating log entry " + logEntry); + try (RestHighLevelClient client = flintClient.createClient()) { + UpdateResponse response = + client.update( + new UpdateRequest(metadataLogIndexName, logEntry.id()) + .doc(logEntry.toJson(), XContentType.JSON) + .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) + .setIfSeqNo(logEntry.seqNo()) + .setIfPrimaryTerm(logEntry.primaryTerm()), + RequestOptions.DEFAULT); + + // Update seqNo and primaryTerm in log entry object + return logEntry.copy( + logEntry.id(), response.getSeqNo(), response.getPrimaryTerm(), + logEntry.state(), logEntry.dataSource(), logEntry.error()); + } catch (OpenSearchException | IOException e) { + throw new IllegalStateException("Failed to update log entry: " + logEntry, e); + } + } +} diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/OptimisticTransaction.java b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/OptimisticTransaction.java index 2e928a411..3a490a87b 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/OptimisticTransaction.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/OptimisticTransaction.java @@ -7,7 +7,6 @@ import java.util.function.Function; import java.util.function.Predicate; -import java.util.function.Supplier; /** * Optimistic transaction interface that represents a state transition on the state machine. @@ -45,7 +44,7 @@ public interface OptimisticTransaction { * @param operation operation * @return result */ - T execute(Function operation); + T commit(Function operation); /** * No optimistic transaction. @@ -67,7 +66,7 @@ public OptimisticTransaction finalLog(Function operation) { + public T commit(Function operation) { return operation.apply(null); } }; diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java index 3a3f9abc4..4c0a2875a 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java @@ -43,6 +43,7 @@ import org.opensearch.flint.core.FlintOptions; import org.opensearch.flint.core.auth.AWSRequestSigningApacheInterceptor; import org.opensearch.flint.core.metadata.FlintMetadata; +import org.opensearch.flint.core.metadata.log.FlintOpenSearchMetadataLog; import org.opensearch.flint.core.metadata.log.OptimisticTransaction; import org.opensearch.flint.core.metadata.log.OptimisticTransaction.NoOptimisticTransaction; import org.opensearch.index.query.AbstractQueryBuilder; @@ -78,7 +79,8 @@ public FlintOpenSearchClient(FlintOptions options) { String metaLogIndexName = ".query_request_history_mys3"; try (RestHighLevelClient client = createClient()) { if (client.indices().exists(new GetIndexRequest(metaLogIndexName), RequestOptions.DEFAULT)) { - return new OpenSearchOptimisticTransaction<>(this, indexName, metaLogIndexName); + return new OpenSearchOptimisticTransaction<>( + new FlintOpenSearchMetadataLog(this, indexName, metaLogIndexName)); } else { return new NoOptimisticTransaction<>(); } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchOptimisticTransaction.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchOptimisticTransaction.java index 55553cef9..881778987 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchOptimisticTransaction.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchOptimisticTransaction.java @@ -6,26 +6,14 @@ package org.opensearch.flint.core.storage; import static org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState$; +import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM; +import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; -import java.io.IOException; -import java.util.Base64; import java.util.Objects; import java.util.function.Function; import java.util.function.Predicate; -import java.util.function.Supplier; import java.util.logging.Logger; -import org.opensearch.OpenSearchException; -import org.opensearch.action.get.GetRequest; -import org.opensearch.action.get.GetResponse; -import org.opensearch.action.index.IndexRequest; -import org.opensearch.action.index.IndexResponse; -import org.opensearch.action.support.WriteRequest; -import org.opensearch.action.update.UpdateRequest; -import org.opensearch.action.update.UpdateResponse; -import org.opensearch.client.RequestOptions; -import org.opensearch.client.RestHighLevelClient; -import org.opensearch.common.xcontent.XContentType; -import org.opensearch.flint.core.FlintClient; +import org.opensearch.flint.core.metadata.log.FlintMetadataLog; import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry; import org.opensearch.flint.core.metadata.log.OptimisticTransaction; @@ -40,135 +28,70 @@ public class OpenSearchOptimisticTransaction implements OptimisticTransaction private static final Logger LOG = Logger.getLogger(OpenSearchOptimisticTransaction.class.getName()); /** - * Flint client to create Rest OpenSearch client (This will be refactored later) + * Flint metadata log */ - private final FlintClient flintClient; - - /** - * Reuse query request index as Flint metadata log store - */ - private final String metadataLogIndexName; - - /** - * Doc id for latest log entry (Naming rule is static so no need to query Flint index metadata) - */ - private final String latestId; + private final FlintMetadataLog metadataLog; private Predicate initialCondition = null; private Function transientAction = null; private Function finalAction = null; - public OpenSearchOptimisticTransaction(FlintClient flintClient, String flintIndexName, String metadataLogIndexName) { - this.flintClient = flintClient; - this.latestId = Base64.getEncoder().encodeToString(flintIndexName.getBytes()); - this.metadataLogIndexName = metadataLogIndexName; + public OpenSearchOptimisticTransaction( + FlintMetadataLog metadataLog) { + this.metadataLog = metadataLog; } @Override - public OpenSearchOptimisticTransaction initialLog(Predicate initialCondition) { + public OpenSearchOptimisticTransaction initialLog( + Predicate initialCondition) { this.initialCondition = initialCondition; return this; } @Override - public OpenSearchOptimisticTransaction transientLog(Function action) { + public OpenSearchOptimisticTransaction transientLog( + Function action) { this.transientAction = action; return this; } @Override - public OpenSearchOptimisticTransaction finalLog(Function action) { + public OpenSearchOptimisticTransaction finalLog( + Function action) { this.finalAction = action; return this; } @Override - public T execute(Function operation) { + public T commit(Function operation) { Objects.requireNonNull(initialCondition); Objects.requireNonNull(transientAction); Objects.requireNonNull(finalAction); - FlintMetadataLogEntry latest = getLatestLogEntry(); - if (latest.id().isEmpty()) { - latest = createLogEntry(latest); - } + FlintMetadataLogEntry latest = + metadataLog.getLatest().orElseGet(() -> metadataLog.add(emptyLogEntry())); if (initialCondition.test(latest)) { // TODO: log entry can be same? - latest = updateLogEntry(transientAction.apply(latest)); + latest = metadataLog.add(transientAction.apply(latest)); T result = operation.apply(latest); - updateLogEntry(finalAction.apply(latest)); + metadataLog.add(finalAction.apply(latest)); return result; } else { - throw new IllegalStateException("Exit due to initial log precondition not satisfied"); + throw new IllegalStateException( + "Transaction failed due to initial log precondition not satisfied"); } } - // TODO: Move all these to FlintLogEntry <- FlintOpenSearchLogEntry - - private FlintMetadataLogEntry getLatestLogEntry() { - RestHighLevelClient client = flintClient.createClient(); - try { - GetResponse response = - client.get(new GetRequest(metadataLogIndexName, latestId), RequestOptions.DEFAULT); - - if (response.isExists()) { - return new FlintMetadataLogEntry( - response.getId(), - response.getSeqNo(), - response.getPrimaryTerm(), - response.getSourceAsMap()); - } else { - return new FlintMetadataLogEntry("", -1, -1, IndexState$.MODULE$.EMPTY(), "mys3", ""); - } - } catch (Exception e) { // TODO: resource not found exception? - throw new IllegalStateException("Failed to fetch latest metadata log entry", e); - } - } - - private FlintMetadataLogEntry createLogEntry(FlintMetadataLogEntry logEntry) { - LOG.info("Creating log entry " + logEntry); - try (RestHighLevelClient client = flintClient.createClient()) { - logEntry = logEntry.copy( - latestId, - logEntry.seqNo(), logEntry.primaryTerm(), logEntry.state(), logEntry.dataSource(), logEntry.error()); - - IndexResponse response = client.index( - new IndexRequest() - .index(metadataLogIndexName) - .id(logEntry.id()) - .source(logEntry.toJson(), XContentType.JSON), - RequestOptions.DEFAULT); - - // Update seqNo and primaryTerm in log entry object - return logEntry.copy( - logEntry.id(), response.getSeqNo(), response.getPrimaryTerm(), - logEntry.state(), logEntry.dataSource(), logEntry.error()); - } catch (OpenSearchException | IOException e) { - throw new IllegalStateException("Failed to create initial log entry", e); - } - } - - private FlintMetadataLogEntry updateLogEntry(FlintMetadataLogEntry logEntry) { - LOG.info("Updating log entry " + logEntry); - try (RestHighLevelClient client = flintClient.createClient()) { - UpdateResponse response = - client.update( - new UpdateRequest(metadataLogIndexName, logEntry.id()) - .doc(logEntry.toJson(), XContentType.JSON) - .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) - .setIfSeqNo(logEntry.seqNo()) - .setIfPrimaryTerm(logEntry.primaryTerm()), - RequestOptions.DEFAULT); - - // Update seqNo and primaryTerm in log entry object - return logEntry.copy( - logEntry.id(), response.getSeqNo(), response.getPrimaryTerm(), - logEntry.state(), logEntry.dataSource(), logEntry.error()); - } catch (OpenSearchException | IOException e) { - throw new IllegalStateException("Failed to update log entry: " + logEntry, e); - } + private FlintMetadataLogEntry emptyLogEntry() { + return new FlintMetadataLogEntry( + "", + UNASSIGNED_SEQ_NO, + UNASSIGNED_PRIMARY_TERM, + IndexState$.MODULE$.EMPTY(), + "mys3", // TODO: get it from spark conf + ""); } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index ec439b338..39f25415c 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -97,7 +97,7 @@ class FlintSpark(val spark: SparkSession) extends Logging { .initialLog(latest => latest.state == EMPTY) .transientLog(latest => latest.copy(state = CREATING)) .finalLog(latest => latest.copy(state = ACTIVE)) - .execute(latest => flintClient.createIndex(indexName, metadata)) + .commit(latest => flintClient.createIndex(indexName, metadata)) } catch { case e: Exception => logError("Failed to create Flint index", e) } @@ -131,7 +131,7 @@ class FlintSpark(val spark: SparkSession) extends Logging { latest } }) - .execute(_ => doRefreshIndex(index, indexName, mode)) + .commit(_ => doRefreshIndex(index, indexName, mode)) } // TODO: move to separate class @@ -247,7 +247,7 @@ class FlintSpark(val spark: SparkSession) extends Logging { .initialLog(latest => latest.state == ACTIVE || latest.state == REFRESHING) .transientLog(latest => latest.copy(state = DELETING)) .finalLog(latest => latest.copy(state = DELETED)) - .execute(_ => { + .commit(_ => { // TODO: share same transaction for now stopRefreshingJob(indexName) flintClient.deleteIndex(indexName) diff --git a/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchTransactionITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchTransactionITSuite.scala index 643b54110..d551b5b89 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchTransactionITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchTransactionITSuite.scala @@ -12,6 +12,7 @@ import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState._ import org.opensearch.flint.core.storage.FlintOpenSearchClient import org.opensearch.flint.spark.FlintSparkSuite +import org.opensearch.index.seqno.SequenceNumbers.{UNASSIGNED_PRIMARY_TERM, UNASSIGNED_SEQ_NO} import org.scalatest.matchers.should.Matchers class FlintOpenSearchTransactionITSuite @@ -35,7 +36,7 @@ class FlintOpenSearchTransactionITSuite }) .transientLog(latest => latest.copy(state = CREATING)) .finalLog(latest => latest.copy(state = ACTIVE)) - .execute(_ => latestLogEntry should contain("state" -> "creating")) + .commit(_ => latestLogEntry should contain("state" -> "creating")) latestLogEntry should contain("state" -> "active") } @@ -43,7 +44,13 @@ class FlintOpenSearchTransactionITSuite test("should transit from initial to final if initial is not empty but meet precondition") { // Create doc first to simulate this scenario createLatestLogEntry( - FlintMetadataLogEntry(id = testLatestId, state = ACTIVE, dataSource = "mys3", error = "")) + FlintMetadataLogEntry( + id = testLatestId, + seqNo = UNASSIGNED_SEQ_NO, + primaryTerm = UNASSIGNED_PRIMARY_TERM, + state = ACTIVE, + dataSource = "mys3", + error = "")) flintClient .startTransaction(testFlintIndex) @@ -53,7 +60,7 @@ class FlintOpenSearchTransactionITSuite }) .transientLog(latest => latest.copy(state = REFRESHING)) .finalLog(latest => latest.copy(state = ACTIVE)) - .execute(_ => latestLogEntry should contain("state" -> "refreshing")) + .commit(_ => latestLogEntry should contain("state" -> "refreshing")) latestLogEntry should contain("state" -> "active") } @@ -65,7 +72,7 @@ class FlintOpenSearchTransactionITSuite .initialLog(_ => false) .transientLog(latest => latest) .finalLog(latest => latest) - .execute(_ => {}) + .commit(_ => {}) } } @@ -81,7 +88,7 @@ class FlintOpenSearchTransactionITSuite latest.copy(state = CREATING) }) .finalLog(latest => latest) - .execute(_ => {}) + .commit(_ => {}) } } @@ -95,7 +102,7 @@ class FlintOpenSearchTransactionITSuite latest.copy(state = CREATING) }) .finalLog(latest => latest) - .execute(latest => { + .commit(latest => { // This update will happen first and thus cause version conflict as expected updateLatestLogEntry(latest, DELETING) })