Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Flint index metadata log and transaction support #110

Merged
merged 23 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import org.opensearch.client.RestHighLevelClient;
import org.opensearch.flint.core.metadata.FlintMetadata;
import org.opensearch.flint.core.metadata.log.OptimisticTransaction;
import org.opensearch.flint.core.storage.FlintReader;
import org.opensearch.flint.core.storage.FlintWriter;

Expand All @@ -18,6 +19,15 @@
*/
public interface FlintClient {

/**
* Start a new optimistic transaction.
*
* @param indexName index name
* @param dataSourceName TODO: read from elsewhere in future
* @return transaction handle
*/
<T> OptimisticTransaction<T> startTransaction(String indexName, String dataSourceName);

/**
* Create a Flint index with the metadata given.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ case class FlintMetadata(
properties: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef],
/** Flint index schema */
schema: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef],
/** Optional latest metadata log entry */
latestId: Option[String] = None,
/** Optional Flint index settings. TODO: move elsewhere? */
indexSettings: Option[String]) {

Expand All @@ -58,6 +60,9 @@ case class FlintMetadata(
.field("source", source)
.field("indexedColumns", indexedColumns)

if (latestId.isDefined) {
builder.field("latestId", latestId.get)
}
optionalObjectField(builder, "options", options)
optionalObjectField(builder, "properties", properties)
}
Expand Down Expand Up @@ -219,14 +224,14 @@ object FlintMetadata {
def build(): FlintMetadata = {
FlintMetadata(
if (version == null) current() else version,
name,
kind,
source,
indexedColumns,
options,
properties,
schema,
indexSettings)
name = name,
kind = kind,
source = source,
indexedColumns = indexedColumns,
options = options,
properties = properties,
schema = schema,
indexSettings = indexSettings)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.metadata.log;

import static org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState$;
import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;

import java.util.Objects;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.logging.Logger;

/**
* Default optimistic transaction implementation that captures the basic workflow for
* transaction support by optimistic locking.
*
* @param <T> result type
*/
public class DefaultOptimisticTransaction<T> implements OptimisticTransaction<T> {

private static final Logger LOG = Logger.getLogger(DefaultOptimisticTransaction.class.getName());

/**
* Data source name. TODO: remove this in future.
*/
private final String dataSourceName;

/**
* Flint metadata log
*/
private final FlintMetadataLog<FlintMetadataLogEntry> metadataLog;

private Predicate<FlintMetadataLogEntry> initialCondition = null;
private Function<FlintMetadataLogEntry, FlintMetadataLogEntry> transientAction = null;
private Function<FlintMetadataLogEntry, FlintMetadataLogEntry> finalAction = null;

public DefaultOptimisticTransaction(
String dataSourceName,
FlintMetadataLog<FlintMetadataLogEntry> metadataLog) {
this.dataSourceName = dataSourceName;
this.metadataLog = metadataLog;
}

@Override
public DefaultOptimisticTransaction<T> initialLog(
Predicate<FlintMetadataLogEntry> initialCondition) {
this.initialCondition = initialCondition;
return this;
}

@Override
public DefaultOptimisticTransaction<T> transientLog(
Function<FlintMetadataLogEntry, FlintMetadataLogEntry> action) {
this.transientAction = action;
return this;
}

@Override
public DefaultOptimisticTransaction<T> finalLog(
Function<FlintMetadataLogEntry, FlintMetadataLogEntry> action) {
this.finalAction = action;
return this;
}

@Override
public T commit(Function<FlintMetadataLogEntry, T> operation) {
Objects.requireNonNull(initialCondition);
Objects.requireNonNull(finalAction);

// Get the latest log and create if not exists
FlintMetadataLogEntry latest =
metadataLog.getLatest().orElseGet(() -> metadataLog.add(emptyLogEntry()));

// Perform initial log check
if (initialCondition.test(latest)) {

// Append optional transient log
if (transientAction != null) {
latest = metadataLog.add(transientAction.apply(latest));
}

// Perform operation
T result = operation.apply(latest);

// Append final log
metadataLog.add(finalAction.apply(latest));
return result;
} else {
LOG.warning("Initial log entry doesn't satisfy precondition " + latest);
throw new IllegalStateException(
"Transaction failed due to initial log precondition not satisfied");
}
}

private FlintMetadataLogEntry emptyLogEntry() {
return new FlintMetadataLogEntry(
"",
UNASSIGNED_SEQ_NO,
UNASSIGNED_PRIMARY_TERM,
IndexState$.MODULE$.EMPTY(),
dataSourceName,
"");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.metadata.log;

import java.util.Optional;

/**
* Flint metadata log that provides transactional support on write API based on different storage.
*/
public interface FlintMetadataLog<T> {

/**
* Add a new log entry to the metadata log.
*
* @param logEntry log entry
* @return log entry after add
*/
T add(T logEntry);

/**
* Get the latest log entry in the metadata log.
*
* @return latest log entry
*/
Optional<T> getLatest();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.metadata.log

import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.IndexState

/**
* Flint metadata log entry. This is temporary and will merge field in FlintMetadata here and move
* implementation specific field, such as seqNo, primaryTerm, dataSource to properties.
*
* @param id
* log entry id
* @param seqNo
* OpenSearch sequence number
* @param primaryTerm
* OpenSearch primary term
* @param state
* Flint index state
* @param dataSource
* OpenSearch data source associated //TODO: remove?
* @param error
* error details if in error state
*/
case class FlintMetadataLogEntry(
id: String,
seqNo: Long,
primaryTerm: Long,
state: IndexState,
dataSource: String, // TODO: get from Spark conf
error: String) {

def this(id: String, seqNo: Long, primaryTerm: Long, map: java.util.Map[String, AnyRef]) {
this(
id,
seqNo,
primaryTerm,
IndexState.from(map.get("state").asInstanceOf[String]),
map.get("dataSourceName").asInstanceOf[String],
map.get("error").asInstanceOf[String])
}

def toJson: String = {
// Implicitly populate latest appId, jobId and timestamp whenever persist
s"""
|{
| "version": "1.0",
| "type": "flintindexstate",
| "state": "$state",
| "applicationId": "${sys.env.getOrElse("SERVERLESS_EMR_VIRTUAL_CLUSTER_ID", "unknown")}",
| "jobId": "${sys.env.getOrElse("SERVERLESS_EMR_JOB_ID", "unknown")}",
| "dataSourceName": "$dataSource",
| "lastUpdateTime": "${System.currentTimeMillis()}",
| "error": "$error"
|}
|""".stripMargin
}
}

object FlintMetadataLogEntry {

/**
* Flint index state enum.
*/
object IndexState extends Enumeration {
type IndexState = Value
val EMPTY: IndexState.Value = Value("empty")
val CREATING: IndexState.Value = Value("creating")
val ACTIVE: IndexState.Value = Value("active")
val REFRESHING: IndexState.Value = Value("refreshing")
val DELETING: IndexState.Value = Value("deleting")
val DELETED: IndexState.Value = Value("deleted")
val FAILED: IndexState.Value = Value("failed")
val UNKNOWN: IndexState.Value = Value("unknown")

def from(s: String): IndexState.Value = {
IndexState.values
.find(_.toString.equalsIgnoreCase(s))
.getOrElse(IndexState.UNKNOWN)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.metadata.log;

import java.util.function.Function;
import java.util.function.Predicate;

/**
* Optimistic transaction interface that represents a state transition on the state machine.
* In particular, this abstraction is trying to express:
* initial log (precondition)
* => transient log (with pending operation to do)
* => final log (after operation succeeds)
* For example, "empty" => creating (operation is to create index) => active
*
* @param <T> result type
*/
public interface OptimisticTransaction<T> {

/**
* @param initialCondition initial precondition that the subsequent transition and action can proceed
* @return this transaction
*/
OptimisticTransaction<T> initialLog(Predicate<FlintMetadataLogEntry> initialCondition);

/**
* @param action action to generate transient log entry
* @return this transaction
*/
OptimisticTransaction<T> transientLog(Function<FlintMetadataLogEntry, FlintMetadataLogEntry> action);

/**
* @param action action to generate final log entry
* @return this transaction
*/
OptimisticTransaction<T> finalLog(Function<FlintMetadataLogEntry, FlintMetadataLogEntry> action);

/**
* Execute the given operation with the given log transition above.
*
* @param operation operation
* @return result
*/
T commit(Function<FlintMetadataLogEntry, T> operation);

/**
* No optimistic transaction.
*/
class NoOptimisticTransaction<T> implements OptimisticTransaction<T> {
@Override
public OptimisticTransaction<T> initialLog(Predicate<FlintMetadataLogEntry> initialCondition) {
return this;
}

@Override
public OptimisticTransaction<T> transientLog(Function<FlintMetadataLogEntry, FlintMetadataLogEntry> action) {
return this;
}

@Override
public OptimisticTransaction<T> finalLog(Function<FlintMetadataLogEntry, FlintMetadataLogEntry> action) {
return this;
}

@Override
public T commit(Function<FlintMetadataLogEntry, T> operation) {
return operation.apply(null);
}
};
}
Loading
Loading