diff --git a/flint-commons/src/main/scala/org/opensearch/flint/common/metadata/log/FlintMetadataLogEntry.scala b/flint-commons/src/main/scala/org/opensearch/flint/common/metadata/log/FlintMetadataLogEntry.scala index 409021486..07f78fff3 100644 --- a/flint-commons/src/main/scala/org/opensearch/flint/common/metadata/log/FlintMetadataLogEntry.scala +++ b/flint-commons/src/main/scala/org/opensearch/flint/common/metadata/log/FlintMetadataLogEntry.scala @@ -22,6 +22,8 @@ import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState * entry version fields for consistency control * @param error * error details if in error state + * @param storageContext + * extra context fields required for storage */ case class FlintMetadataLogEntry( id: String, @@ -31,16 +33,28 @@ case class FlintMetadataLogEntry( */ createTime: Long, state: IndexState, - entryVersion: Map[String, _], - error: String) { + entryVersion: Map[String, Any], + error: String, + storageContext: Map[String, Any]) { def this( id: String, createTime: Long, state: IndexState, - entryVersion: JMap[String, _], - error: String) { - this(id, createTime, state, entryVersion.asScala.toMap, error) + entryVersion: JMap[String, Any], + error: String, + storageContext: JMap[String, Any]) = { + this(id, createTime, state, entryVersion.asScala.toMap, error, storageContext.asScala.toMap) + } + + def this( + id: String, + createTime: Long, + state: IndexState, + entryVersion: JMap[String, Any], + error: String, + storageContext: Map[String, Any]) = { + this(id, createTime, state, entryVersion.asScala.toMap, error, storageContext) } } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java index 6c4f34ecd..19fc05bc8 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java @@ -88,7 +88,8 @@ public T commit(Function operation) { initialLog.createTime(), initialLog.state(), latest.entryVersion(), - initialLog.error()); + initialLog.error(), + initialLog.storageContext()); } // Perform operation diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java index 963e4b3bf..2e8cc7a39 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java @@ -128,7 +128,8 @@ public FlintMetadataLogEntry emptyLogEntry() { 0L, FlintMetadataLogEntry.IndexState$.MODULE$.EMPTY(), Map.of("seqNo", UNASSIGNED_SEQ_NO, "primaryTerm", UNASSIGNED_PRIMARY_TERM), - ""); + "", + Map.of("dataSourceName", dataSourceName)); } public FlintMetadataLogEntry failLogEntry(String error) { @@ -137,7 +138,8 @@ public FlintMetadataLogEntry failLogEntry(String error) { 0L, FlintMetadataLogEntry.IndexState$.MODULE$.FAILED(), Map.of("seqNo", UNASSIGNED_SEQ_NO, "primaryTerm", UNASSIGNED_PRIMARY_TERM), - error); + error, + Map.of("dataSourceName", dataSourceName)); } private FlintMetadataLogEntry createLogEntry(FlintMetadataLogEntry logEntry) { @@ -149,7 +151,8 @@ private FlintMetadataLogEntry createLogEntry(FlintMetadataLogEntry logEntry) { logEntry.createTime(), logEntry.state(), logEntry.entryVersion(), - logEntry.error()); + logEntry.error(), + logEntry.storageContext()); return writeLogEntry(logEntryWithId, client -> client.index( @@ -157,7 +160,7 @@ private FlintMetadataLogEntry createLogEntry(FlintMetadataLogEntry logEntry) { .index(metadataLogIndexName) .id(logEntryWithId.id()) .setRefreshPolicy(RefreshPolicy.WAIT_UNTIL) - .source(toJson(logEntryWithId, dataSourceName), XContentType.JSON), + .source(toJson(logEntryWithId), XContentType.JSON), RequestOptions.DEFAULT)); } @@ -166,7 +169,7 @@ private FlintMetadataLogEntry updateLogEntry(FlintMetadataLogEntry logEntry) { return writeLogEntry(logEntry, client -> client.update( new UpdateRequest(metadataLogIndexName, logEntry.id()) - .doc(toJson(logEntry, dataSourceName), XContentType.JSON) + .doc(toJson(logEntry), XContentType.JSON) .setRefreshPolicy(RefreshPolicy.WAIT_UNTIL) .setIfSeqNo((Long) logEntry.entryVersion().get("seqNo").get()) .setIfPrimaryTerm((Long) logEntry.entryVersion().get("primaryTerm").get()), @@ -186,7 +189,8 @@ private FlintMetadataLogEntry writeLogEntry( logEntry.createTime(), logEntry.state(), Map.of("seqNo", response.getSeqNo(), "primaryTerm", response.getPrimaryTerm()), - logEntry.error()); + logEntry.error(), + logEntry.storageContext()); LOG.info("Log entry written as " + logEntry); return logEntry; diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLogEntryStorageUtils.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLogEntryStorageUtils.java index 52215a7ef..0f969c75a 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLogEntryStorageUtils.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLogEntryStorageUtils.java @@ -77,8 +77,17 @@ public class FlintOpenSearchMetadataLogEntryStorageUtils { " }", "}"); - // TODO: move dataSourceName to entry - public static String toJson(FlintMetadataLogEntry logEntry, String dataSourceName) { + /** + * Convert a log entry to json string for persisting to OpenSearch. + * Expects the following field in storage context: + * - dataSourceName: data source name + * + * @param logEntry + * log entry to convert + * @return + * json string representation of the log entry + */ + public static String toJson(FlintMetadataLogEntry logEntry) { String applicationId = System.getenv().getOrDefault("SERVERLESS_EMR_VIRTUAL_CLUSTER_ID", "unknown"); String jobId = System.getenv().getOrDefault("SERVERLESS_EMR_JOB_ID", "unknown"); long lastUpdateTime = System.currentTimeMillis(); @@ -96,9 +105,23 @@ public static String toJson(FlintMetadataLogEntry logEntry, String dataSourceNam " \"lastUpdateTime\": %d,\n" + " \"error\": \"%s\"\n" + "}", - logEntry.id(), logEntry.state(), applicationId, jobId, dataSourceName, logEntry.createTime(), lastUpdateTime, logEntry.error()); + logEntry.id(), logEntry.state(), applicationId, jobId, logEntry.storageContext().get("dataSourceName").get(), logEntry.createTime(), lastUpdateTime, logEntry.error()); } + /** + * Construct a log entry from OpenSearch document fields. + * + * @param id + * OpenSearch document id + * @param seqNo + * OpenSearch document sequence number + * @param primaryTerm + * OpenSearch document primary term + * @param sourceMap + * OpenSearch document source as a map + * @return + * log entry + */ public static FlintMetadataLogEntry constructLogEntry( String id, Long seqNo, @@ -106,10 +129,11 @@ public static FlintMetadataLogEntry constructLogEntry( Map sourceMap) { return new FlintMetadataLogEntry( id, - /* getSourceAsMap() may use Integer or Long even though it's always long in index mapping */ + /* sourceMap may use Integer or Long even though it's always long in index mapping */ (Long) sourceMap.get("jobStartTime"), FlintMetadataLogEntry.IndexState$.MODULE$.from((String) sourceMap.get("state")), Map.of("seqNo", seqNo, "primaryTerm", primaryTerm), - (String) sourceMap.get("error")); + (String) sourceMap.get("error"), + Map.of("dataSourceName", (String) sourceMap.get("dataSourceName"))); } } diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndexSuite.scala index 97524cc44..917d5aee7 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndexSuite.scala @@ -174,7 +174,13 @@ class ApplyFlintSparkCoveringIndexSuite extends FlintSuite with Matchers { def withIndex(index: FlintSparkCoveringIndex, state: IndexState = ACTIVE): AssertionHelper = { this.indexes = indexes :+ index.copy(latestLogEntry = Some( - new FlintMetadataLogEntry("id", 0, state, Map("seqNo" -> 0, "primaryTerm" -> 0), ""))) + new FlintMetadataLogEntry( + "id", + 0, + state, + Map("seqNo" -> 0, "primaryTerm" -> 0), + "", + Map("dataSourceName" -> "dataSource")))) this } diff --git a/integ-test/src/test/scala/org/opensearch/flint/OpenSearchTransactionSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/OpenSearchTransactionSuite.scala index 952a10a0e..0c2ec22ff 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/OpenSearchTransactionSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/OpenSearchTransactionSuite.scala @@ -66,14 +66,14 @@ trait OpenSearchTransactionSuite extends FlintSparkSuite { new IndexRequest() .index(testMetaLogIndex) .id(latest.id) - .source(toJson(latest, testDataSourceName), XContentType.JSON), + .source(toJson(latest), XContentType.JSON), RequestOptions.DEFAULT) } def updateLatestLogEntry(latest: FlintMetadataLogEntry, newState: IndexState): Unit = { openSearchClient.update( new UpdateRequest(testMetaLogIndex, latest.id) - .doc(toJson(latest.copy(state = newState), testDataSourceName), XContentType.JSON), + .doc(toJson(latest.copy(state = newState)), XContentType.JSON), RequestOptions.DEFAULT) } diff --git a/integ-test/src/test/scala/org/opensearch/flint/core/FlintMetadataLogITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/core/FlintMetadataLogITSuite.scala index 01da48946..628e2c614 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/core/FlintMetadataLogITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/core/FlintMetadataLogITSuite.scala @@ -32,7 +32,8 @@ class FlintMetadataLogITSuite extends OpenSearchTransactionSuite with Matchers { testCreateTime, ACTIVE, Map("seqNo" -> UNASSIGNED_SEQ_NO, "primaryTerm" -> UNASSIGNED_PRIMARY_TERM), - "") + "", + Map("dataSourceName" -> testDataSourceName)) var flintMetadataLogService: FlintMetadataLogService = _ @@ -86,6 +87,7 @@ class FlintMetadataLogITSuite extends OpenSearchTransactionSuite with Matchers { latest.get.id shouldBe testLatestId latest.get.createTime shouldBe testCreateTime latest.get.error shouldBe "" + latest.get.storageContext.get("dataSourceName").get shouldBe testDataSourceName } test("should not get index metadata log if not exist") { diff --git a/integ-test/src/test/scala/org/opensearch/flint/core/FlintTransactionITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/core/FlintTransactionITSuite.scala index 67cfc727d..3c2152dd1 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/core/FlintTransactionITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/core/FlintTransactionITSuite.scala @@ -41,6 +41,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { latest.state shouldBe EMPTY latest.createTime shouldBe 0L latest.error shouldBe "" + latest.storageContext.get("dataSourceName").get shouldBe testDataSourceName true }) .finalLog(latest => latest) @@ -55,7 +56,8 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { createTime = testCreateTime, state = ACTIVE, Map("seqNo" -> UNASSIGNED_SEQ_NO, "primaryTerm" -> UNASSIGNED_PRIMARY_TERM), - error = "")) + error = "", + Map("dataSourceName" -> testDataSourceName))) flintMetadataLogService .startTransaction(testFlintIndex) @@ -63,6 +65,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { latest.id shouldBe testLatestId latest.createTime shouldBe testCreateTime latest.error shouldBe "" + latest.storageContext.get("dataSourceName").get shouldBe testDataSourceName true }) .transientLog(latest => latest.copy(state = DELETING)) @@ -71,6 +74,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { latest.id shouldBe testLatestId latest.createTime shouldBe testCreateTime latest.error shouldBe "" + latest.storageContext.get("dataSourceName").get shouldBe testDataSourceName }) latestLogEntry(testLatestId) should (contain("latestId" -> testLatestId) and @@ -111,7 +115,8 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { createTime = 1234567890123L, state = ACTIVE, Map("seqNo" -> UNASSIGNED_SEQ_NO, "primaryTerm" -> UNASSIGNED_PRIMARY_TERM), - error = "")) + error = "", + Map("dataSourceName" -> testDataSourceName))) flintMetadataLogService .startTransaction(testFlintIndex) @@ -196,7 +201,8 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { createTime = 1234567890123L, state = ACTIVE, Map("seqNo" -> UNASSIGNED_SEQ_NO, "primaryTerm" -> UNASSIGNED_PRIMARY_TERM), - error = "")) + error = "", + Map("dataSourceName" -> testDataSourceName))) the[IllegalStateException] thrownBy { flintMetadataLogService @@ -235,6 +241,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { latest.state shouldBe EMPTY latest.createTime shouldBe 0L latest.error shouldBe "" + latest.storageContext.get("dataSourceName").get shouldBe testDataSourceName true }) .finalLog(latest => latest)