Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate metadata log entry data model and persistence #406

Merged
merged 10 commits into from
Jul 15, 2024
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ lazy val flintCore = (project in file("flint-core"))
"org.mockito" % "mockito-junit-jupiter" % "3.12.4" % "test",
"org.junit.jupiter" % "junit-jupiter-api" % "5.9.0" % "test",
"org.junit.jupiter" % "junit-jupiter-engine" % "5.9.0" % "test",
"com.typesafe.play" %% "play-json" % "2.9.2" % "test",
"com.google.truth" % "truth" % "1.1.5" % "test",
"net.aichler" % "jupiter-interface" % "0.11.1" % Test
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,6 @@ public interface FlintMetadataLog<T> {
* Remove all log entries.
*/
void purge();

T emptyLogEntry();
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,67 +5,56 @@

package org.opensearch.flint.common.metadata.log

import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState
import java.util.{Map => JMap}

import scala.collection.JavaConverters._

import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState.IndexState

/**
* Flint metadata log entry. This is temporary and will merge field in FlintMetadata here and move
* implementation specific field, such as seqNo, primaryTerm, dataSource to properties.
* Flint metadata log entry. This is temporary and will merge field in FlintMetadata here.
*
* @param id
* log entry id
* @param seqNo
* OpenSearch sequence number
* @param primaryTerm
* OpenSearch primary term
* @param state
* Flint index state
* @param dataSource
* OpenSearch data source associated //TODO: remove?
* @param entryVersion
* entry version fields for consistency control
* @param error
* error details if in error state
* @param storageContext
* extra context fields required for storage
seankao-az marked this conversation as resolved.
Show resolved Hide resolved
*/
case class FlintMetadataLogEntry(
id: String,
seqNo: Long,
primaryTerm: Long,
/**
* This is currently used as streaming job start time. In future, this should represent the
* create timestamp of the log entry
*/
createTime: Long,
state: IndexState,
dataSource: String,
error: String) {
entryVersion: Map[String, Any],
error: String,
storageContext: Map[String, Any]) {

def this(id: String, seqNo: Long, primaryTerm: Long, map: java.util.Map[String, AnyRef]) {
this(
id,
seqNo,
primaryTerm,
/* getSourceAsMap() may use Integer or Long even though it's always long in index mapping */
map.get("jobStartTime").asInstanceOf[Number].longValue(),
IndexState.from(map.get("state").asInstanceOf[String]),
map.get("dataSourceName").asInstanceOf[String],
map.get("error").asInstanceOf[String])
def this(
id: String,
createTime: Long,
state: IndexState,
entryVersion: JMap[String, Any],
error: String,
storageContext: JMap[String, Any]) = {
this(id, createTime, state, entryVersion.asScala.toMap, error, storageContext.asScala.toMap)
}

def toJson: String = {
// Implicitly populate latest appId, jobId and timestamp whenever persist
s"""
|{
| "version": "1.0",
| "latestId": "$id",
| "type": "flintindexstate",
| "state": "$state",
| "applicationId": "${sys.env.getOrElse("SERVERLESS_EMR_VIRTUAL_CLUSTER_ID", "unknown")}",
| "jobId": "${sys.env.getOrElse("SERVERLESS_EMR_JOB_ID", "unknown")}",
| "dataSourceName": "$dataSource",
| "jobStartTime": $createTime,
| "lastUpdateTime": ${System.currentTimeMillis()},
| "error": "$error"
|}
|""".stripMargin
def this(
id: String,
createTime: Long,
state: IndexState,
entryVersion: JMap[String, Any],
error: String,
storageContext: Map[String, Any]) = {
this(id, createTime, state, entryVersion.asScala.toMap, error, storageContext)
}
}

Expand Down Expand Up @@ -94,70 +83,4 @@ object FlintMetadataLogEntry {
.getOrElse(IndexState.UNKNOWN)
}
}

val QUERY_EXECUTION_REQUEST_MAPPING: String =
"""{
| "dynamic": false,
| "properties": {
| "version": {
| "type": "keyword"
| },
| "type": {
| "type": "keyword"
| },
| "state": {
| "type": "keyword"
| },
| "statementId": {
| "type": "keyword"
| },
| "applicationId": {
| "type": "keyword"
| },
| "sessionId": {
| "type": "keyword"
| },
| "sessionType": {
| "type": "keyword"
| },
| "error": {
| "type": "text"
| },
| "lang": {
| "type": "keyword"
| },
| "query": {
| "type": "text"
| },
| "dataSourceName": {
| "type": "keyword"
| },
| "submitTime": {
| "type": "date",
| "format": "strict_date_time||epoch_millis"
| },
| "jobId": {
| "type": "keyword"
| },
| "lastUpdateTime": {
| "type": "date",
| "format": "strict_date_time||epoch_millis"
| },
| "queryId": {
| "type": "keyword"
| },
| "excludeJobIds": {
| "type": "keyword"
| }
| }
|}""".stripMargin

val QUERY_EXECUTION_REQUEST_SETTINGS: String =
"""{
| "index": {
| "number_of_shards": "1",
| "auto_expand_replicas": "0-2",
| "number_of_replicas": "0"
| }
|}""".stripMargin
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@

import static java.util.logging.Level.SEVERE;
import static java.util.logging.Level.WARNING;
import static org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState$;
import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;

import java.util.Objects;
import java.util.function.Function;
Expand All @@ -30,11 +27,6 @@ public class DefaultOptimisticTransaction<T> implements OptimisticTransaction<T>

private static final Logger LOG = Logger.getLogger(DefaultOptimisticTransaction.class.getName());

/**
* Data source name. TODO: remove this in future.
*/
private final String dataSourceName;

/**
* Flint metadata log
*/
Expand All @@ -44,10 +36,7 @@ public class DefaultOptimisticTransaction<T> implements OptimisticTransaction<T>
private Function<FlintMetadataLogEntry, FlintMetadataLogEntry> transientAction = null;
private Function<FlintMetadataLogEntry, FlintMetadataLogEntry> finalAction = null;

public DefaultOptimisticTransaction(
String dataSourceName,
FlintMetadataLog<FlintMetadataLogEntry> metadataLog) {
this.dataSourceName = dataSourceName;
public DefaultOptimisticTransaction(FlintMetadataLog<FlintMetadataLogEntry> metadataLog) {
this.metadataLog = metadataLog;
}

Expand Down Expand Up @@ -79,7 +68,7 @@ public T commit(Function<FlintMetadataLogEntry, T> operation) {

// Get the latest log and create if not exists
FlintMetadataLogEntry latest =
metadataLog.getLatest().orElseGet(() -> metadataLog.add(emptyLogEntry()));
metadataLog.getLatest().orElseGet(() -> metadataLog.add(metadataLog.emptyLogEntry()));

// Perform initial log check
if (!initialCondition.test(latest)) {
Expand All @@ -93,15 +82,14 @@ public T commit(Function<FlintMetadataLogEntry, T> operation) {
if (transientAction != null) {
latest = metadataLog.add(transientAction.apply(latest));

// Copy latest seqNo and primaryTerm to initialLog for potential rollback use
// Copy latest entryVersion to initialLog for potential rollback use
initialLog = initialLog.copy(
initialLog.id(),
latest.seqNo(),
latest.primaryTerm(),
initialLog.createTime(),
initialLog.state(),
initialLog.dataSource(),
initialLog.error());
latest.entryVersion(),
initialLog.error(),
initialLog.storageContext());
}

// Perform operation
Expand Down Expand Up @@ -129,15 +117,4 @@ public T commit(Function<FlintMetadataLogEntry, T> operation) {
throw new IllegalStateException("Failed to commit transaction operation", e);
}
}

private FlintMetadataLogEntry emptyLogEntry() {
return new FlintMetadataLogEntry(
"",
UNASSIGNED_SEQ_NO,
UNASSIGNED_PRIMARY_TERM,
0L,
IndexState$.MODULE$.EMPTY(),
dataSourceName,
"");
}
}
Loading
Loading