From c3bbe63564ad4239b96c1258898c081e3ffd2ca0 Mon Sep 17 00:00:00 2001 From: Sean Kao Date: Mon, 15 Jul 2024 08:37:47 -0700 Subject: [PATCH] Separate metadata log entry data model and persistence (#406) * update metadata log entry model to be generic Signed-off-by: Sean Kao * move OS specific stuff away from log entry Signed-off-by: Sean Kao refactor: utils for OS log entry storage Signed-off-by: Sean Kao * remove index name from log entry Signed-off-by: Sean Kao * add log entry storage context Signed-off-by: Sean Kao * fix error Signed-off-by: Sean Kao * rename storage utils to OS converter and add test Signed-off-by: Sean Kao * update comment for log entry for OpenSearch Signed-off-by: Sean Kao * rename storageContext to properties Signed-off-by: Sean Kao * remove unused failLogEntry Signed-off-by: Sean Kao * use jackson log entry toJson Signed-off-by: Sean Kao --------- Signed-off-by: Sean Kao --- build.sbt | 60 ++++--- .../log/DefaultOptimisticTransaction.java | 35 +---- ...ntMetadataLogEntryOpenSearchConverter.java | 146 ++++++++++++++++++ .../storage/FlintOpenSearchMetadataLog.java | 53 +++++-- .../FlintOpenSearchMetadataLogService.java | 6 +- ...dataLogEntryOpenSearchConverterSuite.scala | 98 ++++++++++++ .../common/metadata/log/FlintMetadataLog.java | 2 + .../metadata/log/FlintMetadataLogEntry.scala | 135 ++++------------ .../opensearch/flint/spark/FlintSpark.scala | 4 +- .../ApplyFlintSparkCoveringIndexSuite.scala | 10 +- .../flint/OpenSearchTransactionSuite.scala | 6 +- .../flint/core/FlintMetadataLogITSuite.scala | 17 +- .../flint/core/FlintTransactionITSuite.scala | 29 ++-- .../spark/FlintSparkIndexJobITSuite.scala | 6 +- 14 files changed, 401 insertions(+), 206 deletions(-) create mode 100644 flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintMetadataLogEntryOpenSearchConverter.java create mode 100644 flint-core/src/test/scala/org/opensearch/flint/core/storage/FlintMetadataLogEntryOpenSearchConverterSuite.scala diff --git a/build.sbt b/build.sbt index 2a4ee80d0..de4c6b25f 100644 --- a/build.sbt +++ b/build.sbt @@ -68,7 +68,20 @@ lazy val flintCore = (project in file("flint-core")) exclude ("com.fasterxml.jackson.core", "jackson-databind"), "com.amazonaws" % "aws-java-sdk-cloudwatch" % "1.12.593" exclude("com.fasterxml.jackson.core", "jackson-databind"), - "software.amazon.awssdk" % "auth-crt" % "2.25.23" + "software.amazon.awssdk" % "auth-crt" % "2.25.23", + "org.scalactic" %% "scalactic" % "3.2.15" % "test", + "org.scalatest" %% "scalatest" % "3.2.15" % "test", + "org.scalatest" %% "scalatest-flatspec" % "3.2.15" % "test", + "org.scalatestplus" %% "mockito-4-6" % "3.2.15.0" % "test", + "com.stephenn" %% "scalatest-json-jsonassert" % "0.2.5" % "test", + "org.mockito" % "mockito-core" % "4.6.1" % "test", + "org.mockito" % "mockito-inline" % "4.6.1" % "test", + "org.mockito" % "mockito-junit-jupiter" % "3.12.4" % "test", + "org.junit.jupiter" % "junit-jupiter-api" % "5.9.0" % "test", + "org.junit.jupiter" % "junit-jupiter-engine" % "5.9.0" % "test", + "com.typesafe.play" %% "play-json" % "2.9.2" % "test", + "com.google.truth" % "truth" % "1.1.5" % "test", + "net.aichler" % "jupiter-interface" % "0.11.1" % Test ), libraryDependencies ++= deps(sparkVersion), publish / skip := true) @@ -81,6 +94,7 @@ lazy val flintData = (project in file("flint-data")) libraryDependencies ++= Seq( "org.json4s" %% "json4s-jackson" % "4.0.5", "org.json4s" %% "json4s-native" % "4.0.5", + "org.scalactic" %% "scalactic" % "3.2.15" % "test", "org.scalatest" %% "scalatest" % "3.2.15" % "test", "org.scalatestplus" %% "mockito-4-6" % "3.2.15.0" % "test" ), @@ -90,13 +104,20 @@ lazy val flintData = (project in file("flint-data")) assembly / assemblyOption ~= { _.withIncludeScala(false) }, - assemblyMergeStrategy in assembly := { - case PathList("META-INF", xs @ _*) => MergeStrategy.discard - case x => MergeStrategy.first - } + assembly / assemblyMergeStrategy := { + case PathList(ps@_*) if ps.last endsWith ("module-info.class") => + MergeStrategy.discard + case PathList("module-info.class") => MergeStrategy.discard + case PathList("META-INF", "versions", xs@_, "module-info.class") => + MergeStrategy.discard + case x => + val oldStrategy = (assembly / assemblyMergeStrategy).value + oldStrategy(x) + }, ) .enablePlugins(AssemblyPlugin) + lazy val pplSparkIntegration = (project in file("ppl-spark-integration")) .enablePlugins(AssemblyPlugin, Antlr4Plugin) .settings( @@ -117,10 +138,9 @@ lazy val pplSparkIntegration = (project in file("ppl-spark-integration")) Antlr4 / antlr4GenListener := true, Antlr4 / antlr4GenVisitor := true, // Assembly settings - assembly / test := {}, - assembly / assemblyExcludedJars := { - val cp = (assembly / fullClasspath).value - cp.filter(_.data.getName.contains("test")) + assemblyPackageScala / assembleArtifact := false, + assembly / assemblyOption ~= { + _.withIncludeScala(false) }, assembly / assemblyMergeStrategy := { case PathList(ps @ _*) if ps.last endsWith ("module-info.class") => @@ -157,10 +177,9 @@ lazy val flintSparkIntegration = (project in file("flint-spark-integration")) Antlr4 / antlr4GenListener := true, Antlr4 / antlr4GenVisitor := true, // Assembly settings - assembly / test := {}, - assembly / assemblyExcludedJars := { - val cp = (assembly / fullClasspath).value - cp.filter(_.data.getName.contains("test")) + assemblyPackageScala / assembleArtifact := false, + assembly / assemblyOption ~= { + _.withIncludeScala(false) }, assembly / assemblyMergeStrategy := { case PathList(ps @ _*) if ps.last endsWith ("module-info.class") => @@ -229,10 +248,13 @@ lazy val sparkSqlApplication = (project in file("spark-sql-application")) "org.mockito" %% "mockito-scala" % "1.16.42" % "test", "org.scalatestplus" %% "mockito-4-6" % "3.2.15.0" % "test"), // Assembly settings - assembly / test := {}, - assembly / assemblyExcludedJars := { - val cp = (assembly / fullClasspath).value - cp.filter(_.data.getName.contains("test")) + // the sbt assembly plugin found multiple copies of the module-info.class file with + // different contents in the jars that it was merging flintCore dependencies. + // This can happen if you have multiple dependencies that include the same library, + // but with different versions. + assemblyPackageScala / assembleArtifact := false, + assembly / assemblyOption ~= { + _.withIncludeScala(false) }, assembly / assemblyMergeStrategy := { case PathList(ps@_*) if ps.last endsWith ("module-info.class") => @@ -243,7 +265,9 @@ lazy val sparkSqlApplication = (project in file("spark-sql-application")) case x => val oldStrategy = (assembly / assemblyMergeStrategy).value oldStrategy(x) - }) + }, + assembly / test := (Test / test).value + ) lazy val sparkSqlApplicationCosmetic = project .settings( diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java index 1992e173d..ed390d1cf 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java @@ -7,9 +7,6 @@ import static java.util.logging.Level.SEVERE; import static java.util.logging.Level.WARNING; -import static org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState$; -import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM; -import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; import java.util.Objects; import java.util.function.Function; @@ -30,11 +27,6 @@ public class DefaultOptimisticTransaction implements OptimisticTransaction private static final Logger LOG = Logger.getLogger(DefaultOptimisticTransaction.class.getName()); - /** - * Data source name. TODO: remove this in future. - */ - private final String dataSourceName; - /** * Flint metadata log */ @@ -44,10 +36,7 @@ public class DefaultOptimisticTransaction implements OptimisticTransaction private Function transientAction = null; private Function finalAction = null; - public DefaultOptimisticTransaction( - String dataSourceName, - FlintMetadataLog metadataLog) { - this.dataSourceName = dataSourceName; + public DefaultOptimisticTransaction(FlintMetadataLog metadataLog) { this.metadataLog = metadataLog; } @@ -79,7 +68,7 @@ public T commit(Function operation) { // Get the latest log and create if not exists FlintMetadataLogEntry latest = - metadataLog.getLatest().orElseGet(() -> metadataLog.add(emptyLogEntry())); + metadataLog.getLatest().orElseGet(() -> metadataLog.add(metadataLog.emptyLogEntry())); // Perform initial log check if (!initialCondition.test(latest)) { @@ -93,15 +82,14 @@ public T commit(Function operation) { if (transientAction != null) { latest = metadataLog.add(transientAction.apply(latest)); - // Copy latest seqNo and primaryTerm to initialLog for potential rollback use + // Copy latest entryVersion to initialLog for potential rollback use initialLog = initialLog.copy( initialLog.id(), - latest.seqNo(), - latest.primaryTerm(), initialLog.createTime(), initialLog.state(), - initialLog.dataSource(), - initialLog.error()); + latest.entryVersion(), + initialLog.error(), + initialLog.properties()); } // Perform operation @@ -129,15 +117,4 @@ public T commit(Function operation) { throw new IllegalStateException("Failed to commit transaction operation"); } } - - private FlintMetadataLogEntry emptyLogEntry() { - return new FlintMetadataLogEntry( - "", - UNASSIGNED_SEQ_NO, - UNASSIGNED_PRIMARY_TERM, - 0L, - IndexState$.MODULE$.EMPTY(), - dataSourceName, - ""); - } } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintMetadataLogEntryOpenSearchConverter.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintMetadataLogEntryOpenSearchConverter.java new file mode 100644 index 000000000..0b78304d2 --- /dev/null +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintMetadataLogEntryOpenSearchConverter.java @@ -0,0 +1,146 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.storage; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry; + +import java.util.Map; + +/** + * Utility class for handling Flint metadata log entries in OpenSearch storage. + */ +public class FlintMetadataLogEntryOpenSearchConverter { + + public static final String QUERY_EXECUTION_REQUEST_MAPPING = String.join("\n", + "{", + " \"dynamic\": false,", + " \"properties\": {", + " \"version\": {", + " \"type\": \"keyword\"", + " },", + " \"type\": {", + " \"type\": \"keyword\"", + " },", + " \"state\": {", + " \"type\": \"keyword\"", + " },", + " \"statementId\": {", + " \"type\": \"keyword\"", + " },", + " \"applicationId\": {", + " \"type\": \"keyword\"", + " },", + " \"sessionId\": {", + " \"type\": \"keyword\"", + " },", + " \"sessionType\": {", + " \"type\": \"keyword\"", + " },", + " \"error\": {", + " \"type\": \"text\"", + " },", + " \"lang\": {", + " \"type\": \"keyword\"", + " },", + " \"query\": {", + " \"type\": \"text\"", + " },", + " \"dataSourceName\": {", + " \"type\": \"keyword\"", + " },", + " \"submitTime\": {", + " \"type\": \"date\",", + " \"format\": \"strict_date_time||epoch_millis\"", + " },", + " \"jobId\": {", + " \"type\": \"keyword\"", + " },", + " \"lastUpdateTime\": {", + " \"type\": \"date\",", + " \"format\": \"strict_date_time||epoch_millis\"", + " },", + " \"queryId\": {", + " \"type\": \"keyword\"", + " },", + " \"excludeJobIds\": {", + " \"type\": \"keyword\"", + " }", + " }", + "}"); + + public static final String QUERY_EXECUTION_REQUEST_SETTINGS = String.join("\n", + "{", + " \"index\": {", + " \"number_of_shards\": \"1\",", + " \"auto_expand_replicas\": \"0-2\",", + " \"number_of_replicas\": \"0\"", + " }", + "}"); + + /** + * Convert a log entry to json string for persisting to OpenSearch. + * Expects the following field in properties: + * - dataSourceName: data source name + * + * @param logEntry + * log entry to convert + * @return + * json string representation of the log entry + */ + public static String toJson(FlintMetadataLogEntry logEntry) throws JsonProcessingException { + String applicationId = System.getenv().getOrDefault("SERVERLESS_EMR_VIRTUAL_CLUSTER_ID", "unknown"); + String jobId = System.getenv().getOrDefault("SERVERLESS_EMR_JOB_ID", "unknown"); + long lastUpdateTime = System.currentTimeMillis(); + + ObjectMapper mapper = new ObjectMapper(); + ObjectNode json = mapper.createObjectNode(); + + json.put("version", "1.0"); + json.put("latestId", logEntry.id()); + json.put("type", "flintindexstate"); + json.put("state", logEntry.state().toString()); + json.put("applicationId", applicationId); + json.put("jobId", jobId); + json.put("dataSourceName", logEntry.properties().get("dataSourceName").get().toString()); + json.put("jobStartTime", logEntry.createTime()); + json.put("lastUpdateTime", lastUpdateTime); + json.put("error", logEntry.error()); + + return mapper.writerWithDefaultPrettyPrinter().writeValueAsString(json); + } + + /** + * Construct a log entry from OpenSearch document fields. + * + * @param id + * OpenSearch document id + * @param seqNo + * OpenSearch document sequence number + * @param primaryTerm + * OpenSearch document primary term + * @param sourceMap + * OpenSearch document source as a map + * @return + * log entry + */ + public static FlintMetadataLogEntry constructLogEntry( + String id, + Long seqNo, + Long primaryTerm, + Map sourceMap) { + return new FlintMetadataLogEntry( + id, + /* sourceMap may use Integer or Long even though it's always long in index mapping */ + ((Number) sourceMap.get("jobStartTime")).longValue(), + FlintMetadataLogEntry.IndexState$.MODULE$.from((String) sourceMap.get("state")), + Map.of("seqNo", seqNo, "primaryTerm", primaryTerm), + (String) sourceMap.get("error"), + Map.of("dataSourceName", (String) sourceMap.get("dataSourceName"))); + } +} diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java index ebd719380..7944de5ae 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java @@ -6,10 +6,15 @@ package org.opensearch.flint.core.storage; import static java.util.logging.Level.SEVERE; +import static org.opensearch.flint.core.storage.FlintMetadataLogEntryOpenSearchConverter.constructLogEntry; +import static org.opensearch.flint.core.storage.FlintMetadataLogEntryOpenSearchConverter.toJson; import static org.opensearch.action.support.WriteRequest.RefreshPolicy; +import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM; +import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; import java.io.IOException; import java.util.Base64; +import java.util.Map; import java.util.Optional; import java.util.logging.Logger; import org.opensearch.OpenSearchException; @@ -31,6 +36,12 @@ /** * Flint metadata log in OpenSearch store. For now use single doc instead of maintaining history * of metadata log. + * Expects the following fields from maps in FlintMetadataLogEntry: + * - entryVersion: + * - seqNo (Long): OpenSearch sequence number + * - primaryTerm (Long): OpenSearch primary term + * - storageContext: + * - dataSourceName (String): OpenSearch data source associated */ public class FlintOpenSearchMetadataLog implements FlintMetadataLog { @@ -45,6 +56,7 @@ public class FlintOpenSearchMetadataLog implements FlintMetadataLog getLatest() { client.get(new GetRequest(metadataLogIndexName, latestId), RequestOptions.DEFAULT); if (response.isExists()) { - FlintMetadataLogEntry latest = new FlintMetadataLogEntry( + FlintMetadataLogEntry latest = constructLogEntry( response.getId(), response.getSeqNo(), response.getPrimaryTerm(), - response.getSourceAsMap()); + response.getSourceAsMap() + ); LOG.info("Found latest log entry " + latest); return Optional.of(latest); @@ -113,18 +127,28 @@ public void purge() { } } + @Override + public FlintMetadataLogEntry emptyLogEntry() { + return new FlintMetadataLogEntry( + "", + 0L, + FlintMetadataLogEntry.IndexState$.MODULE$.EMPTY(), + Map.of("seqNo", UNASSIGNED_SEQ_NO, "primaryTerm", UNASSIGNED_PRIMARY_TERM), + "", + Map.of("dataSourceName", dataSourceName)); + } + private FlintMetadataLogEntry createLogEntry(FlintMetadataLogEntry logEntry) { LOG.info("Creating log entry " + logEntry); // Assign doc ID here FlintMetadataLogEntry logEntryWithId = logEntry.copy( latestId, - logEntry.seqNo(), - logEntry.primaryTerm(), logEntry.createTime(), logEntry.state(), - logEntry.dataSource(), - logEntry.error()); + logEntry.entryVersion(), + logEntry.error(), + logEntry.properties()); return writeLogEntry(logEntryWithId, client -> client.index( @@ -132,7 +156,7 @@ private FlintMetadataLogEntry createLogEntry(FlintMetadataLogEntry logEntry) { .index(metadataLogIndexName) .id(logEntryWithId.id()) .setRefreshPolicy(RefreshPolicy.WAIT_UNTIL) - .source(logEntryWithId.toJson(), XContentType.JSON), + .source(toJson(logEntryWithId), XContentType.JSON), RequestOptions.DEFAULT)); } @@ -141,10 +165,10 @@ private FlintMetadataLogEntry updateLogEntry(FlintMetadataLogEntry logEntry) { return writeLogEntry(logEntry, client -> client.update( new UpdateRequest(metadataLogIndexName, logEntry.id()) - .doc(logEntry.toJson(), XContentType.JSON) + .doc(toJson(logEntry), XContentType.JSON) .setRefreshPolicy(RefreshPolicy.WAIT_UNTIL) - .setIfSeqNo(logEntry.seqNo()) - .setIfPrimaryTerm(logEntry.primaryTerm()), + .setIfSeqNo((Long) logEntry.entryVersion().get("seqNo").get()) + .setIfPrimaryTerm((Long) logEntry.entryVersion().get("primaryTerm").get()), RequestOptions.DEFAULT)); } @@ -156,14 +180,13 @@ private FlintMetadataLogEntry writeLogEntry( DocWriteResponse response = write.apply(client); // Copy latest seqNo and primaryTerm after write - logEntry = logEntry.copy( + logEntry = new FlintMetadataLogEntry( logEntry.id(), - response.getSeqNo(), - response.getPrimaryTerm(), logEntry.createTime(), logEntry.state(), - logEntry.dataSource(), - logEntry.error()); + Map.of("seqNo", response.getSeqNo(), "primaryTerm", response.getPrimaryTerm()), + logEntry.error(), + logEntry.properties()); LOG.info("Log entry written as " + logEntry); return logEntry; diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLogService.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLogService.java index bd456d875..2a0971a6c 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLogService.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLogService.java @@ -48,7 +48,7 @@ public OptimisticTransaction startTransaction(String indexName, boolean f String errorMsg = "Metadata log index not found " + metadataLogIndexName; throw new IllegalStateException(errorMsg); } - return new DefaultOptimisticTransaction<>(dataSourceName, metadataLog.get()); + return new DefaultOptimisticTransaction<>(metadataLog.get()); } @Override @@ -88,8 +88,8 @@ private void initIndexMetadataLog() { LOG.info("Initializing metadata log index " + metadataLogIndexName); try (IRestHighLevelClient client = createOpenSearchClient()) { CreateIndexRequest request = new CreateIndexRequest(metadataLogIndexName); - request.mapping(FlintMetadataLogEntry.QUERY_EXECUTION_REQUEST_MAPPING(), XContentType.JSON); - request.settings(FlintMetadataLogEntry.QUERY_EXECUTION_REQUEST_SETTINGS(), XContentType.JSON); + request.mapping(FlintMetadataLogEntryOpenSearchConverter.QUERY_EXECUTION_REQUEST_MAPPING, XContentType.JSON); + request.settings(FlintMetadataLogEntryOpenSearchConverter.QUERY_EXECUTION_REQUEST_SETTINGS, XContentType.JSON); client.createIndex(request, RequestOptions.DEFAULT); } catch (Exception e) { throw new IllegalStateException("Failed to initialize metadata log index " + metadataLogIndexName, e); diff --git a/flint-core/src/test/scala/org/opensearch/flint/core/storage/FlintMetadataLogEntryOpenSearchConverterSuite.scala b/flint-core/src/test/scala/org/opensearch/flint/core/storage/FlintMetadataLogEntryOpenSearchConverterSuite.scala new file mode 100644 index 000000000..577dfc5fc --- /dev/null +++ b/flint-core/src/test/scala/org/opensearch/flint/core/storage/FlintMetadataLogEntryOpenSearchConverterSuite.scala @@ -0,0 +1,98 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.storage + +import java.util.{Map => JMap} + +import com.stephenn.scalatest.jsonassert.JsonMatchers.matchJson +import org.mockito.Mockito.when +import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry +import org.scalatest.BeforeAndAfter +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import org.scalatestplus.mockito.MockitoSugar.mock +import play.api.libs.json.{Json, JsValue} + +class FlintMetadataLogEntryOpenSearchConverterTest + extends AnyFlatSpec + with BeforeAndAfter + with Matchers { + val mockLogEntry: FlintMetadataLogEntry = mock[FlintMetadataLogEntry] + + val sourceMap = JMap.of( + "jobStartTime", + 1234567890123L.asInstanceOf[Object], + "state", + "active".asInstanceOf[Object], + "dataSourceName", + "testDataSource".asInstanceOf[Object], + "error", + "".asInstanceOf[Object]) + + before { + when(mockLogEntry.id).thenReturn("id") + when(mockLogEntry.state).thenReturn(FlintMetadataLogEntry.IndexState.ACTIVE) + when(mockLogEntry.createTime).thenReturn(1234567890123L) + when(mockLogEntry.error).thenReturn("") + when(mockLogEntry.properties).thenReturn(Map("dataSourceName" -> "testDataSource")) + } + + it should "convert to json" in { + // Removing lastUpdateTime since System.currentTimeMillis() cannot be mocked + val expectedJsonWithoutLastUpdateTime = + s""" + |{ + | "version": "1.0", + | "latestId": "id", + | "type": "flintindexstate", + | "state": "active", + | "applicationId": "unknown", + | "jobId": "unknown", + | "dataSourceName": "testDataSource", + | "jobStartTime": 1234567890123, + | "error": "" + |} + |""".stripMargin + val actualJson = FlintMetadataLogEntryOpenSearchConverter.toJson(mockLogEntry) + removeJsonField(actualJson, "lastUpdateTime") should matchJson( + expectedJsonWithoutLastUpdateTime) + } + + it should "construct log entry" in { + val logEntry = + FlintMetadataLogEntryOpenSearchConverter.constructLogEntry("id", 1L, 1L, sourceMap) + logEntry shouldBe a[FlintMetadataLogEntry] + logEntry.id shouldBe "id" + logEntry.createTime shouldBe 1234567890123L + logEntry.state shouldBe FlintMetadataLogEntry.IndexState.ACTIVE + logEntry.error shouldBe "" + logEntry.properties.get("dataSourceName").get shouldBe "testDataSource" + } + + it should "construct log entry with integer jobStartTime value" in { + val testSourceMap = JMap.of( + "jobStartTime", + 1234567890.asInstanceOf[Object], // Integer instead of Long + "state", + "active".asInstanceOf[Object], + "dataSourceName", + "testDataSource".asInstanceOf[Object], + "error", + "".asInstanceOf[Object]) + val logEntry = + FlintMetadataLogEntryOpenSearchConverter.constructLogEntry("id", 1L, 1L, testSourceMap) + logEntry shouldBe a[FlintMetadataLogEntry] + logEntry.id shouldBe "id" + logEntry.createTime shouldBe 1234567890 + logEntry.state shouldBe FlintMetadataLogEntry.IndexState.ACTIVE + logEntry.error shouldBe "" + logEntry.properties.get("dataSourceName").get shouldBe "testDataSource" + } + + private def removeJsonField(json: String, field: String): String = { + Json.stringify(Json.toJson(Json.parse(json).as[Map[String, JsValue]] - field)) + } +} diff --git a/flint-data/src/main/scala/org/opensearch/flint/common/metadata/log/FlintMetadataLog.java b/flint-data/src/main/scala/org/opensearch/flint/common/metadata/log/FlintMetadataLog.java index 8cd9bd6ea..2a62dd35f 100644 --- a/flint-data/src/main/scala/org/opensearch/flint/common/metadata/log/FlintMetadataLog.java +++ b/flint-data/src/main/scala/org/opensearch/flint/common/metadata/log/FlintMetadataLog.java @@ -31,4 +31,6 @@ public interface FlintMetadataLog { * Remove all log entries. */ void purge(); + + T emptyLogEntry(); } diff --git a/flint-data/src/main/scala/org/opensearch/flint/common/metadata/log/FlintMetadataLogEntry.scala b/flint-data/src/main/scala/org/opensearch/flint/common/metadata/log/FlintMetadataLogEntry.scala index 57fb2108f..a7391ed6a 100644 --- a/flint-data/src/main/scala/org/opensearch/flint/common/metadata/log/FlintMetadataLogEntry.scala +++ b/flint-data/src/main/scala/org/opensearch/flint/common/metadata/log/FlintMetadataLogEntry.scala @@ -5,67 +5,56 @@ package org.opensearch.flint.common.metadata.log -import FlintMetadataLogEntry.IndexState -import FlintMetadataLogEntry.IndexState.IndexState +import java.util.{Map => JMap} + +import scala.collection.JavaConverters._ + +import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState.IndexState /** - * Flint metadata log entry. This is temporary and will merge field in FlintMetadata here and move - * implementation specific field, such as seqNo, primaryTerm, dataSource to properties. + * Flint metadata log entry. This is temporary and will merge field in FlintMetadata here. * * @param id * log entry id - * @param seqNo - * OpenSearch sequence number - * @param primaryTerm - * OpenSearch primary term * @param state * Flint index state - * @param dataSource - * OpenSearch data source associated //TODO: remove? + * @param entryVersion + * entry version fields for consistency control * @param error * error details if in error state + * @param properties + * extra properties fields */ case class FlintMetadataLogEntry( id: String, - seqNo: Long, - primaryTerm: Long, /** * This is currently used as streaming job start time. In future, this should represent the * create timestamp of the log entry */ createTime: Long, state: IndexState, - dataSource: String, - error: String) { + entryVersion: Map[String, Any], + error: String, + properties: Map[String, Any]) { - def this(id: String, seqNo: Long, primaryTerm: Long, map: java.util.Map[String, AnyRef]) { - this( - id, - seqNo, - primaryTerm, - /* getSourceAsMap() may use Integer or Long even though it's always long in index mapping */ - map.get("jobStartTime").asInstanceOf[Number].longValue(), - IndexState.from(map.get("state").asInstanceOf[String]), - map.get("dataSourceName").asInstanceOf[String], - map.get("error").asInstanceOf[String]) + def this( + id: String, + createTime: Long, + state: IndexState, + entryVersion: JMap[String, Any], + error: String, + storageContext: JMap[String, Any]) = { + this(id, createTime, state, entryVersion.asScala.toMap, error, storageContext.asScala.toMap) } - def toJson: String = { - // Implicitly populate latest appId, jobId and timestamp whenever persist - s""" - |{ - | "version": "1.0", - | "latestId": "$id", - | "type": "flintindexstate", - | "state": "$state", - | "applicationId": "${sys.env.getOrElse("SERVERLESS_EMR_VIRTUAL_CLUSTER_ID", "unknown")}", - | "jobId": "${sys.env.getOrElse("SERVERLESS_EMR_JOB_ID", "unknown")}", - | "dataSourceName": "$dataSource", - | "jobStartTime": $createTime, - | "lastUpdateTime": ${System.currentTimeMillis()}, - | "error": "$error" - |} - |""".stripMargin + def this( + id: String, + createTime: Long, + state: IndexState, + entryVersion: JMap[String, Any], + error: String, + storageContext: Map[String, Any]) = { + this(id, createTime, state, entryVersion.asScala.toMap, error, storageContext) } } @@ -94,70 +83,4 @@ object FlintMetadataLogEntry { .getOrElse(IndexState.UNKNOWN) } } - - val QUERY_EXECUTION_REQUEST_MAPPING: String = - """{ - | "dynamic": false, - | "properties": { - | "version": { - | "type": "keyword" - | }, - | "type": { - | "type": "keyword" - | }, - | "state": { - | "type": "keyword" - | }, - | "statementId": { - | "type": "keyword" - | }, - | "applicationId": { - | "type": "keyword" - | }, - | "sessionId": { - | "type": "keyword" - | }, - | "sessionType": { - | "type": "keyword" - | }, - | "error": { - | "type": "text" - | }, - | "lang": { - | "type": "keyword" - | }, - | "query": { - | "type": "text" - | }, - | "dataSourceName": { - | "type": "keyword" - | }, - | "submitTime": { - | "type": "date", - | "format": "strict_date_time||epoch_millis" - | }, - | "jobId": { - | "type": "keyword" - | }, - | "lastUpdateTime": { - | "type": "date", - | "format": "strict_date_time||epoch_millis" - | }, - | "queryId": { - | "type": "keyword" - | }, - | "excludeJobIds": { - | "type": "keyword" - | } - | } - |}""".stripMargin - - val QUERY_EXECUTION_REQUEST_SETTINGS: String = - """{ - | "index": { - | "number_of_shards": "1", - | "auto_expand_replicas": "0-2", - | "number_of_replicas": "0" - | } - |}""".stripMargin } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 0ab24032d..351268e35 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -466,7 +466,7 @@ class FlintSpark(val spark: SparkSession) extends Logging { flintMetadataLogService .startTransaction(indexName) .initialLog(latest => - latest.state == REFRESHING && latest.seqNo == indexLogEntry.seqNo && latest.primaryTerm == indexLogEntry.primaryTerm) + latest.state == REFRESHING && latest.entryVersion == indexLogEntry.entryVersion) .transientLog(latest => latest.copy(state = UPDATING)) .finalLog(latest => latest.copy(state = ACTIVE)) .commit(_ => { @@ -485,7 +485,7 @@ class FlintSpark(val spark: SparkSession) extends Logging { flintMetadataLogService .startTransaction(indexName) .initialLog(latest => - latest.state == ACTIVE && latest.seqNo == indexLogEntry.seqNo && latest.primaryTerm == indexLogEntry.primaryTerm) + latest.state == ACTIVE && latest.entryVersion == indexLogEntry.entryVersion) .transientLog(latest => latest.copy(state = UPDATING, createTime = System.currentTimeMillis())) .finalLog(latest => { diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndexSuite.scala index 5231bdfa6..1c2a52c66 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndexSuite.scala @@ -171,8 +171,14 @@ class ApplyFlintSparkCoveringIndexSuite extends FlintSuite with Matchers { def withIndex(index: FlintSparkCoveringIndex, state: IndexState = ACTIVE): AssertionHelper = { this.indexes = indexes :+ - index.copy(latestLogEntry = - Some(new FlintMetadataLogEntry("id", 0, 0, 0, state, "spark_catalog", ""))) + index.copy(latestLogEntry = Some( + new FlintMetadataLogEntry( + "id", + 0, + state, + Map("seqNo" -> 0, "primaryTerm" -> 0), + "", + Map("dataSourceName" -> "dataSource")))) this } diff --git a/integ-test/src/test/scala/org/opensearch/flint/OpenSearchTransactionSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/OpenSearchTransactionSuite.scala index f18c4d3d1..74aa0be5d 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/OpenSearchTransactionSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/OpenSearchTransactionSuite.scala @@ -17,8 +17,8 @@ import org.opensearch.client.RequestOptions import org.opensearch.client.indices.{CreateIndexRequest, GetIndexRequest} import org.opensearch.common.xcontent.XContentType import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry -import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.{QUERY_EXECUTION_REQUEST_MAPPING, QUERY_EXECUTION_REQUEST_SETTINGS} import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState.IndexState +import org.opensearch.flint.core.storage.FlintMetadataLogEntryOpenSearchConverter.{toJson, QUERY_EXECUTION_REQUEST_MAPPING, QUERY_EXECUTION_REQUEST_SETTINGS} import org.opensearch.flint.core.storage.FlintOpenSearchMetadataLogService.METADATA_LOG_INDEX_NAME_PREFIX import org.opensearch.flint.spark.FlintSparkSuite @@ -66,14 +66,14 @@ trait OpenSearchTransactionSuite extends FlintSparkSuite { new IndexRequest() .index(testMetaLogIndex) .id(latest.id) - .source(latest.toJson, XContentType.JSON), + .source(toJson(latest), XContentType.JSON), RequestOptions.DEFAULT) } def updateLatestLogEntry(latest: FlintMetadataLogEntry, newState: IndexState): Unit = { openSearchClient.update( new UpdateRequest(testMetaLogIndex, latest.id) - .doc(latest.copy(state = newState).toJson, XContentType.JSON), + .doc(toJson(latest.copy(state = newState)), XContentType.JSON), RequestOptions.DEFAULT) } diff --git a/integ-test/src/test/scala/org/opensearch/flint/core/FlintMetadataLogITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/core/FlintMetadataLogITSuite.scala index 877b5ce79..eadc5031a 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/core/FlintMetadataLogITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/core/FlintMetadataLogITSuite.scala @@ -28,13 +28,12 @@ class FlintMetadataLogITSuite extends OpenSearchTransactionSuite with Matchers { val testLatestId: String = Base64.getEncoder.encodeToString(testFlintIndex.getBytes) val testCreateTime = 1234567890123L val flintMetadataLogEntry = FlintMetadataLogEntry( - id = testLatestId, - seqNo = UNASSIGNED_SEQ_NO, - primaryTerm = UNASSIGNED_PRIMARY_TERM, - createTime = testCreateTime, - state = ACTIVE, - dataSource = testDataSourceName, - error = "") + testLatestId, + testCreateTime, + ACTIVE, + Map("seqNo" -> UNASSIGNED_SEQ_NO, "primaryTerm" -> UNASSIGNED_PRIMARY_TERM), + "", + Map("dataSourceName" -> testDataSourceName)) var flintMetadataLogService: FlintMetadataLogService = _ @@ -87,8 +86,8 @@ class FlintMetadataLogITSuite extends OpenSearchTransactionSuite with Matchers { latest.isPresent shouldBe true latest.get.id shouldBe testLatestId latest.get.createTime shouldBe testCreateTime - latest.get.dataSource shouldBe testDataSourceName latest.get.error shouldBe "" + latest.get.properties.get("dataSourceName").get shouldBe testDataSourceName } test("should not get index metadata log if not exist") { @@ -125,7 +124,7 @@ case class TestMetadataLogService(sparkConf: SparkConf) extends FlintMetadataLog forceInit: Boolean): OptimisticTransaction[T] = { val flintOptions = new FlintOptions(Map[String, String]().asJava) val metadataLog = new FlintOpenSearchMetadataLog(flintOptions, "", "") - new DefaultOptimisticTransaction("", metadataLog) + new DefaultOptimisticTransaction(metadataLog) } override def startTransaction[T](indexName: String): OptimisticTransaction[T] = { diff --git a/integ-test/src/test/scala/org/opensearch/flint/core/FlintTransactionITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/core/FlintTransactionITSuite.scala index d0bb7fa81..605e8e7fd 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/core/FlintTransactionITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/core/FlintTransactionITSuite.scala @@ -40,8 +40,8 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { latest.id shouldBe testLatestId latest.state shouldBe EMPTY latest.createTime shouldBe 0L - latest.dataSource shouldBe testDataSourceName latest.error shouldBe "" + latest.properties.get("dataSourceName").get shouldBe testDataSourceName true }) .finalLog(latest => latest) @@ -53,20 +53,19 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { createLatestLogEntry( FlintMetadataLogEntry( id = testLatestId, - seqNo = UNASSIGNED_SEQ_NO, - primaryTerm = UNASSIGNED_PRIMARY_TERM, createTime = testCreateTime, state = ACTIVE, - dataSource = testDataSourceName, - error = "")) + Map("seqNo" -> UNASSIGNED_SEQ_NO, "primaryTerm" -> UNASSIGNED_PRIMARY_TERM), + error = "", + Map("dataSourceName" -> testDataSourceName))) flintMetadataLogService .startTransaction(testFlintIndex) .initialLog(latest => { latest.id shouldBe testLatestId latest.createTime shouldBe testCreateTime - latest.dataSource shouldBe testDataSourceName latest.error shouldBe "" + latest.properties.get("dataSourceName").get shouldBe testDataSourceName true }) .transientLog(latest => latest.copy(state = DELETING)) @@ -74,8 +73,8 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { .commit(latest => { latest.id shouldBe testLatestId latest.createTime shouldBe testCreateTime - latest.dataSource shouldBe testDataSourceName latest.error shouldBe "" + latest.properties.get("dataSourceName").get shouldBe testDataSourceName }) latestLogEntry(testLatestId) should (contain("latestId" -> testLatestId) and @@ -113,12 +112,11 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { createLatestLogEntry( FlintMetadataLogEntry( id = testLatestId, - seqNo = UNASSIGNED_SEQ_NO, - primaryTerm = UNASSIGNED_PRIMARY_TERM, createTime = 1234567890123L, state = ACTIVE, - dataSource = testDataSourceName, - error = "")) + Map("seqNo" -> UNASSIGNED_SEQ_NO, "primaryTerm" -> UNASSIGNED_PRIMARY_TERM), + error = "", + Map("dataSourceName" -> testDataSourceName))) flintMetadataLogService .startTransaction(testFlintIndex) @@ -200,12 +198,11 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { createLatestLogEntry( FlintMetadataLogEntry( id = testLatestId, - seqNo = UNASSIGNED_SEQ_NO, - primaryTerm = UNASSIGNED_PRIMARY_TERM, createTime = 1234567890123L, state = ACTIVE, - dataSource = testDataSourceName, - error = "")) + Map("seqNo" -> UNASSIGNED_SEQ_NO, "primaryTerm" -> UNASSIGNED_PRIMARY_TERM), + error = "", + Map("dataSourceName" -> testDataSourceName))) the[IllegalStateException] thrownBy { flintMetadataLogService @@ -243,8 +240,8 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { latest.id shouldBe testLatestId latest.state shouldBe EMPTY latest.createTime shouldBe 0L - latest.dataSource shouldBe testDataSourceName latest.error shouldBe "" + latest.properties.get("dataSourceName").get shouldBe testDataSourceName true }) .finalLog(latest => latest) diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobITSuite.scala index 0525a2896..f676d9ad5 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobITSuite.scala @@ -10,8 +10,8 @@ import java.util.Base64 import scala.collection.JavaConverters.mapAsJavaMapConverter import org.opensearch.flint.OpenSearchTransactionSuite -import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState._ +import org.opensearch.flint.core.storage.FlintMetadataLogEntryOpenSearchConverter.constructLogEntry import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName import org.opensearch.index.seqno.SequenceNumbers.{UNASSIGNED_PRIMARY_TERM, UNASSIGNED_SEQ_NO} import org.scalatest.matchers.should.Matchers @@ -69,7 +69,7 @@ class FlintSparkIndexJobITSuite extends OpenSearchTransactionSuite with Matchers .create() updateLatestLogEntry( - new FlintMetadataLogEntry( + constructLogEntry( latestId, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, @@ -92,7 +92,7 @@ class FlintSparkIndexJobITSuite extends OpenSearchTransactionSuite with Matchers .create() updateLatestLogEntry( - new FlintMetadataLogEntry( + constructLogEntry( latestId, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM,