Skip to content

Commit

Permalink
Extract OS logic to metadata log
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Oct 30, 2023
1 parent e6e5904 commit 379b78f
Show file tree
Hide file tree
Showing 8 changed files with 200 additions and 123 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.metadata.log;

import java.util.Optional;

/**
* Flint metadata log.
*/
public interface FlintMetadataLog<T> {

T add(T logEntry);

Optional<T> getLatest();
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.I
*/
case class FlintMetadataLogEntry(
id: String,
seqNo: Long = -1,
primaryTerm: Long = -1,
seqNo: Long,
primaryTerm: Long,
state: IndexState,
dataSource: String, // TODO: get from Spark conf
error: String) {

def this(docId: String, seqNo: Long, primaryTerm: Long, map: java.util.Map[String, AnyRef]) {
def this(id: String, seqNo: Long, primaryTerm: Long, map: java.util.Map[String, AnyRef]) {
this(
docId,
id,
seqNo,
primaryTerm,
IndexState.from(map.get("state").asInstanceOf[String]),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.metadata.log;

import java.io.IOException;
import java.util.Base64;
import java.util.Optional;
import java.util.logging.Logger;
import org.opensearch.OpenSearchException;
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.get.GetResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.flint.core.FlintClient;

/**
* Flint metadata log in OpenSearch store.
*/
public class FlintOpenSearchMetadataLog implements FlintMetadataLog<FlintMetadataLogEntry> {

private static final Logger LOG = Logger.getLogger(FlintOpenSearchMetadataLog.class.getName());

/**
* Flint client to create Rest OpenSearch client (This will be refactored later)
*/
private final FlintClient flintClient;

/**
* Reuse query request index as Flint metadata log store
*/
private final String metadataLogIndexName;

/**
* Doc id for latest log entry (Naming rule is static so no need to query Flint index metadata)
*/
private final String latestId;

public FlintOpenSearchMetadataLog(FlintClient flintClient, String flintIndexName, String metadataLogIndexName) {
this.flintClient = flintClient;
this.latestId = Base64.getEncoder().encodeToString(flintIndexName.getBytes());
this.metadataLogIndexName = metadataLogIndexName;
}

@Override
public FlintMetadataLogEntry add(FlintMetadataLogEntry logEntry) {
// TODO: use single doc for now. this will be always append in future.
FlintMetadataLogEntry latest;
if (logEntry.id().isEmpty()) {
latest = createLogEntry(logEntry);
} else {
latest = updateLogEntry(logEntry);
}
return latest;
}

@Override
public Optional<FlintMetadataLogEntry> getLatest() {
try (RestHighLevelClient client = flintClient.createClient()) {
GetResponse response =
client.get(new GetRequest(metadataLogIndexName, latestId), RequestOptions.DEFAULT);
if (response.isExists()) {
return Optional.of(
new FlintMetadataLogEntry(
response.getId(),
response.getSeqNo(),
response.getPrimaryTerm(),
response.getSourceAsMap()));
} else {
return Optional.empty();
}
} catch (Exception e) {
throw new IllegalStateException("Failed to fetch latest metadata log entry", e);
}
}

private FlintMetadataLogEntry createLogEntry(FlintMetadataLogEntry logEntry) {
LOG.info("Creating log entry " + logEntry);
try (RestHighLevelClient client = flintClient.createClient()) {
logEntry = logEntry.copy(
latestId,
logEntry.seqNo(), logEntry.primaryTerm(), logEntry.state(), logEntry.dataSource(), logEntry.error());

IndexResponse response = client.index(
new IndexRequest()
.index(metadataLogIndexName)
.id(logEntry.id())
.source(logEntry.toJson(), XContentType.JSON),
RequestOptions.DEFAULT);

// Update seqNo and primaryTerm in log entry object
return logEntry.copy(
logEntry.id(), response.getSeqNo(), response.getPrimaryTerm(),
logEntry.state(), logEntry.dataSource(), logEntry.error());
} catch (OpenSearchException | IOException e) {
throw new IllegalStateException("Failed to create initial log entry", e);
}
}

private FlintMetadataLogEntry updateLogEntry(FlintMetadataLogEntry logEntry) {
LOG.info("Updating log entry " + logEntry);
try (RestHighLevelClient client = flintClient.createClient()) {
UpdateResponse response =
client.update(
new UpdateRequest(metadataLogIndexName, logEntry.id())
.doc(logEntry.toJson(), XContentType.JSON)
.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL)
.setIfSeqNo(logEntry.seqNo())
.setIfPrimaryTerm(logEntry.primaryTerm()),
RequestOptions.DEFAULT);

// Update seqNo and primaryTerm in log entry object
return logEntry.copy(
logEntry.id(), response.getSeqNo(), response.getPrimaryTerm(),
logEntry.state(), logEntry.dataSource(), logEntry.error());
} catch (OpenSearchException | IOException e) {
throw new IllegalStateException("Failed to update log entry: " + logEntry, e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;

/**
* Optimistic transaction interface that represents a state transition on the state machine.
Expand Down Expand Up @@ -45,7 +44,7 @@ public interface OptimisticTransaction<T> {
* @param operation operation
* @return result
*/
T execute(Function<FlintMetadataLogEntry, T> operation);
T commit(Function<FlintMetadataLogEntry, T> operation);

/**
* No optimistic transaction.
Expand All @@ -67,7 +66,7 @@ public OptimisticTransaction<T> finalLog(Function<FlintMetadataLogEntry, FlintMe
}

@Override
public T execute(Function<FlintMetadataLogEntry, T> operation) {
public T commit(Function<FlintMetadataLogEntry, T> operation) {
return operation.apply(null);
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.opensearch.flint.core.FlintOptions;
import org.opensearch.flint.core.auth.AWSRequestSigningApacheInterceptor;
import org.opensearch.flint.core.metadata.FlintMetadata;
import org.opensearch.flint.core.metadata.log.FlintOpenSearchMetadataLog;
import org.opensearch.flint.core.metadata.log.OptimisticTransaction;
import org.opensearch.flint.core.metadata.log.OptimisticTransaction.NoOptimisticTransaction;
import org.opensearch.index.query.AbstractQueryBuilder;
Expand Down Expand Up @@ -78,7 +79,8 @@ public FlintOpenSearchClient(FlintOptions options) {
String metaLogIndexName = ".query_request_history_mys3";
try (RestHighLevelClient client = createClient()) {
if (client.indices().exists(new GetIndexRequest(metaLogIndexName), RequestOptions.DEFAULT)) {
return new OpenSearchOptimisticTransaction<>(this, indexName, metaLogIndexName);
return new OpenSearchOptimisticTransaction<>(
new FlintOpenSearchMetadataLog(this, indexName, metaLogIndexName));
} else {
return new NoOptimisticTransaction<>();
}
Expand Down
Loading

0 comments on commit 379b78f

Please sign in to comment.