Skip to content

Commit

Permalink
add log entry storage context
Browse files Browse the repository at this point in the history
Signed-off-by: Sean Kao <[email protected]>
  • Loading branch information
seankao-az committed Jun 30, 2024
1 parent d0adefc commit 37727fe
Show file tree
Hide file tree
Showing 8 changed files with 82 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState
* entry version fields for consistency control
* @param error
* error details if in error state
* @param storageContext
* extra context fields required for storage
*/
case class FlintMetadataLogEntry(
id: String,
Expand All @@ -31,16 +33,28 @@ case class FlintMetadataLogEntry(
*/
createTime: Long,
state: IndexState,
entryVersion: Map[String, _],
error: String) {
entryVersion: Map[String, Any],
error: String,
storageContext: Map[String, Any]) {

def this(
id: String,
createTime: Long,
state: IndexState,
entryVersion: JMap[String, _],
error: String) {
this(id, createTime, state, entryVersion.asScala.toMap, error)
entryVersion: JMap[String, Any],
error: String,
storageContext: JMap[String, Any]) = {
this(id, createTime, state, entryVersion.asScala.toMap, error, storageContext.asScala.toMap)
}

def this(
id: String,
createTime: Long,
state: IndexState,
entryVersion: JMap[String, Any],
error: String,
storageContext: Map[String, Any]) = {
this(id, createTime, state, entryVersion.asScala.toMap, error, storageContext)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ public T commit(Function<FlintMetadataLogEntry, T> operation) {
initialLog.createTime(),
initialLog.state(),
latest.entryVersion(),
initialLog.error());
initialLog.error(),
initialLog.storageContext());
}

// Perform operation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ public FlintMetadataLogEntry emptyLogEntry() {
0L,
FlintMetadataLogEntry.IndexState$.MODULE$.EMPTY(),
Map.of("seqNo", UNASSIGNED_SEQ_NO, "primaryTerm", UNASSIGNED_PRIMARY_TERM),
"");
"",
Map.of("dataSourceName", dataSourceName));
}

public FlintMetadataLogEntry failLogEntry(String error) {
Expand All @@ -137,7 +138,8 @@ public FlintMetadataLogEntry failLogEntry(String error) {
0L,
FlintMetadataLogEntry.IndexState$.MODULE$.FAILED(),
Map.of("seqNo", UNASSIGNED_SEQ_NO, "primaryTerm", UNASSIGNED_PRIMARY_TERM),
error);
error,
Map.of("dataSourceName", dataSourceName));
}

private FlintMetadataLogEntry createLogEntry(FlintMetadataLogEntry logEntry) {
Expand All @@ -149,15 +151,16 @@ private FlintMetadataLogEntry createLogEntry(FlintMetadataLogEntry logEntry) {
logEntry.createTime(),
logEntry.state(),
logEntry.entryVersion(),
logEntry.error());
logEntry.error(),
logEntry.storageContext());

return writeLogEntry(logEntryWithId,
client -> client.index(
new IndexRequest()
.index(metadataLogIndexName)
.id(logEntryWithId.id())
.setRefreshPolicy(RefreshPolicy.WAIT_UNTIL)
.source(toJson(logEntryWithId, dataSourceName), XContentType.JSON),
.source(toJson(logEntryWithId), XContentType.JSON),
RequestOptions.DEFAULT));
}

Expand All @@ -166,7 +169,7 @@ private FlintMetadataLogEntry updateLogEntry(FlintMetadataLogEntry logEntry) {
return writeLogEntry(logEntry,
client -> client.update(
new UpdateRequest(metadataLogIndexName, logEntry.id())
.doc(toJson(logEntry, dataSourceName), XContentType.JSON)
.doc(toJson(logEntry), XContentType.JSON)
.setRefreshPolicy(RefreshPolicy.WAIT_UNTIL)
.setIfSeqNo((Long) logEntry.entryVersion().get("seqNo").get())
.setIfPrimaryTerm((Long) logEntry.entryVersion().get("primaryTerm").get()),
Expand All @@ -186,7 +189,8 @@ private FlintMetadataLogEntry writeLogEntry(
logEntry.createTime(),
logEntry.state(),
Map.of("seqNo", response.getSeqNo(), "primaryTerm", response.getPrimaryTerm()),
logEntry.error());
logEntry.error(),
logEntry.storageContext());

LOG.info("Log entry written as " + logEntry);
return logEntry;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,17 @@ public class FlintOpenSearchMetadataLogEntryStorageUtils {
" }",
"}");

// TODO: move dataSourceName to entry
public static String toJson(FlintMetadataLogEntry logEntry, String dataSourceName) {
/**
* Convert a log entry to json string for persisting to OpenSearch.
* Expects the following field in storage context:
* - dataSourceName: data source name
*
* @param logEntry
* log entry to convert
* @return
* json string representation of the log entry
*/
public static String toJson(FlintMetadataLogEntry logEntry) {
String applicationId = System.getenv().getOrDefault("SERVERLESS_EMR_VIRTUAL_CLUSTER_ID", "unknown");
String jobId = System.getenv().getOrDefault("SERVERLESS_EMR_JOB_ID", "unknown");
long lastUpdateTime = System.currentTimeMillis();
Expand All @@ -96,20 +105,35 @@ public static String toJson(FlintMetadataLogEntry logEntry, String dataSourceNam
" \"lastUpdateTime\": %d,\n" +
" \"error\": \"%s\"\n" +
"}",
logEntry.id(), logEntry.state(), applicationId, jobId, dataSourceName, logEntry.createTime(), lastUpdateTime, logEntry.error());
logEntry.id(), logEntry.state(), applicationId, jobId, logEntry.storageContext().get("dataSourceName").get(), logEntry.createTime(), lastUpdateTime, logEntry.error());
}

/**
* Construct a log entry from OpenSearch document fields.
*
* @param id
* OpenSearch document id
* @param seqNo
* OpenSearch document sequence number
* @param primaryTerm
* OpenSearch document primary term
* @param sourceMap
* OpenSearch document source as a map
* @return
* log entry
*/
public static FlintMetadataLogEntry constructLogEntry(
String id,
Long seqNo,
Long primaryTerm,
Map<String, Object> sourceMap) {
return new FlintMetadataLogEntry(
id,
/* getSourceAsMap() may use Integer or Long even though it's always long in index mapping */
/* sourceMap may use Integer or Long even though it's always long in index mapping */
(Long) sourceMap.get("jobStartTime"),
FlintMetadataLogEntry.IndexState$.MODULE$.from((String) sourceMap.get("state")),
Map.of("seqNo", seqNo, "primaryTerm", primaryTerm),
(String) sourceMap.get("error"));
(String) sourceMap.get("error"),
Map.of("dataSourceName", (String) sourceMap.get("dataSourceName")));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,13 @@ class ApplyFlintSparkCoveringIndexSuite extends FlintSuite with Matchers {
def withIndex(index: FlintSparkCoveringIndex, state: IndexState = ACTIVE): AssertionHelper = {
this.indexes = indexes :+
index.copy(latestLogEntry = Some(
new FlintMetadataLogEntry("id", 0, state, Map("seqNo" -> 0, "primaryTerm" -> 0), "")))
new FlintMetadataLogEntry(
"id",
0,
state,
Map("seqNo" -> 0, "primaryTerm" -> 0),
"",
Map("dataSourceName" -> "dataSource"))))
this
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,14 @@ trait OpenSearchTransactionSuite extends FlintSparkSuite {
new IndexRequest()
.index(testMetaLogIndex)
.id(latest.id)
.source(toJson(latest, testDataSourceName), XContentType.JSON),
.source(toJson(latest), XContentType.JSON),
RequestOptions.DEFAULT)
}

def updateLatestLogEntry(latest: FlintMetadataLogEntry, newState: IndexState): Unit = {
openSearchClient.update(
new UpdateRequest(testMetaLogIndex, latest.id)
.doc(toJson(latest.copy(state = newState), testDataSourceName), XContentType.JSON),
.doc(toJson(latest.copy(state = newState)), XContentType.JSON),
RequestOptions.DEFAULT)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ class FlintMetadataLogITSuite extends OpenSearchTransactionSuite with Matchers {
testCreateTime,
ACTIVE,
Map("seqNo" -> UNASSIGNED_SEQ_NO, "primaryTerm" -> UNASSIGNED_PRIMARY_TERM),
"")
"",
Map("dataSourceName" -> testDataSourceName))

var flintMetadataLogService: FlintMetadataLogService = _

Expand Down Expand Up @@ -86,6 +87,7 @@ class FlintMetadataLogITSuite extends OpenSearchTransactionSuite with Matchers {
latest.get.id shouldBe testLatestId
latest.get.createTime shouldBe testCreateTime
latest.get.error shouldBe ""
latest.get.storageContext.get("dataSourceName").get shouldBe testDataSourceName
}

test("should not get index metadata log if not exist") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers {
latest.state shouldBe EMPTY
latest.createTime shouldBe 0L
latest.error shouldBe ""
latest.storageContext.get("dataSourceName").get shouldBe testDataSourceName
true
})
.finalLog(latest => latest)
Expand All @@ -55,14 +56,16 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers {
createTime = testCreateTime,
state = ACTIVE,
Map("seqNo" -> UNASSIGNED_SEQ_NO, "primaryTerm" -> UNASSIGNED_PRIMARY_TERM),
error = ""))
error = "",
Map("dataSourceName" -> testDataSourceName)))

flintMetadataLogService
.startTransaction(testFlintIndex)
.initialLog(latest => {
latest.id shouldBe testLatestId
latest.createTime shouldBe testCreateTime
latest.error shouldBe ""
latest.storageContext.get("dataSourceName").get shouldBe testDataSourceName
true
})
.transientLog(latest => latest.copy(state = DELETING))
Expand All @@ -71,6 +74,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers {
latest.id shouldBe testLatestId
latest.createTime shouldBe testCreateTime
latest.error shouldBe ""
latest.storageContext.get("dataSourceName").get shouldBe testDataSourceName
})

latestLogEntry(testLatestId) should (contain("latestId" -> testLatestId) and
Expand Down Expand Up @@ -111,7 +115,8 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers {
createTime = 1234567890123L,
state = ACTIVE,
Map("seqNo" -> UNASSIGNED_SEQ_NO, "primaryTerm" -> UNASSIGNED_PRIMARY_TERM),
error = ""))
error = "",
Map("dataSourceName" -> testDataSourceName)))

flintMetadataLogService
.startTransaction(testFlintIndex)
Expand Down Expand Up @@ -196,7 +201,8 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers {
createTime = 1234567890123L,
state = ACTIVE,
Map("seqNo" -> UNASSIGNED_SEQ_NO, "primaryTerm" -> UNASSIGNED_PRIMARY_TERM),
error = ""))
error = "",
Map("dataSourceName" -> testDataSourceName)))

the[IllegalStateException] thrownBy {
flintMetadataLogService
Expand Down Expand Up @@ -235,6 +241,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers {
latest.state shouldBe EMPTY
latest.createTime shouldBe 0L
latest.error shouldBe ""
latest.storageContext.get("dataSourceName").get shouldBe testDataSourceName
true
})
.finalLog(latest => latest)
Expand Down

0 comments on commit 37727fe

Please sign in to comment.