Skip to content

Commit

Permalink
Separate metadata log entry data model and persistence (opensearch-pr…
Browse files Browse the repository at this point in the history
…oject#406)

* update metadata log entry model to be generic

Signed-off-by: Sean Kao <[email protected]>

* move OS specific stuff away from log entry

Signed-off-by: Sean Kao <[email protected]>

refactor: utils for OS log entry storage

Signed-off-by: Sean Kao <[email protected]>

* remove index name from log entry

Signed-off-by: Sean Kao <[email protected]>

* add log entry storage context

Signed-off-by: Sean Kao <[email protected]>

* fix error

Signed-off-by: Sean Kao <[email protected]>

* rename storage utils to OS converter and add test

Signed-off-by: Sean Kao <[email protected]>

* update comment for log entry for OpenSearch

Signed-off-by: Sean Kao <[email protected]>

* rename storageContext to properties

Signed-off-by: Sean Kao <[email protected]>

* remove unused failLogEntry

Signed-off-by: Sean Kao <[email protected]>

* use jackson log entry toJson

Signed-off-by: Sean Kao <[email protected]>

---------

Signed-off-by: Sean Kao <[email protected]>
  • Loading branch information
seankao-az committed Aug 6, 2024
1 parent a4126b8 commit c3bbe63
Show file tree
Hide file tree
Showing 14 changed files with 401 additions and 206 deletions.
60 changes: 42 additions & 18 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,20 @@ lazy val flintCore = (project in file("flint-core"))
exclude ("com.fasterxml.jackson.core", "jackson-databind"),
"com.amazonaws" % "aws-java-sdk-cloudwatch" % "1.12.593"
exclude("com.fasterxml.jackson.core", "jackson-databind"),
"software.amazon.awssdk" % "auth-crt" % "2.25.23"
"software.amazon.awssdk" % "auth-crt" % "2.25.23",
"org.scalactic" %% "scalactic" % "3.2.15" % "test",
"org.scalatest" %% "scalatest" % "3.2.15" % "test",
"org.scalatest" %% "scalatest-flatspec" % "3.2.15" % "test",
"org.scalatestplus" %% "mockito-4-6" % "3.2.15.0" % "test",
"com.stephenn" %% "scalatest-json-jsonassert" % "0.2.5" % "test",
"org.mockito" % "mockito-core" % "4.6.1" % "test",
"org.mockito" % "mockito-inline" % "4.6.1" % "test",
"org.mockito" % "mockito-junit-jupiter" % "3.12.4" % "test",
"org.junit.jupiter" % "junit-jupiter-api" % "5.9.0" % "test",
"org.junit.jupiter" % "junit-jupiter-engine" % "5.9.0" % "test",
"com.typesafe.play" %% "play-json" % "2.9.2" % "test",
"com.google.truth" % "truth" % "1.1.5" % "test",
"net.aichler" % "jupiter-interface" % "0.11.1" % Test
),
libraryDependencies ++= deps(sparkVersion),
publish / skip := true)
Expand All @@ -81,6 +94,7 @@ lazy val flintData = (project in file("flint-data"))
libraryDependencies ++= Seq(
"org.json4s" %% "json4s-jackson" % "4.0.5",
"org.json4s" %% "json4s-native" % "4.0.5",
"org.scalactic" %% "scalactic" % "3.2.15" % "test",
"org.scalatest" %% "scalatest" % "3.2.15" % "test",
"org.scalatestplus" %% "mockito-4-6" % "3.2.15.0" % "test"
),
Expand All @@ -90,13 +104,20 @@ lazy val flintData = (project in file("flint-data"))
assembly / assemblyOption ~= {
_.withIncludeScala(false)
},
assemblyMergeStrategy in assembly := {
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
}
assembly / assemblyMergeStrategy := {
case PathList(ps@_*) if ps.last endsWith ("module-info.class") =>
MergeStrategy.discard
case PathList("module-info.class") => MergeStrategy.discard
case PathList("META-INF", "versions", xs@_, "module-info.class") =>
MergeStrategy.discard
case x =>
val oldStrategy = (assembly / assemblyMergeStrategy).value
oldStrategy(x)
},
)
.enablePlugins(AssemblyPlugin)


lazy val pplSparkIntegration = (project in file("ppl-spark-integration"))
.enablePlugins(AssemblyPlugin, Antlr4Plugin)
.settings(
Expand All @@ -117,10 +138,9 @@ lazy val pplSparkIntegration = (project in file("ppl-spark-integration"))
Antlr4 / antlr4GenListener := true,
Antlr4 / antlr4GenVisitor := true,
// Assembly settings
assembly / test := {},
assembly / assemblyExcludedJars := {
val cp = (assembly / fullClasspath).value
cp.filter(_.data.getName.contains("test"))
assemblyPackageScala / assembleArtifact := false,
assembly / assemblyOption ~= {
_.withIncludeScala(false)
},
assembly / assemblyMergeStrategy := {
case PathList(ps @ _*) if ps.last endsWith ("module-info.class") =>
Expand Down Expand Up @@ -157,10 +177,9 @@ lazy val flintSparkIntegration = (project in file("flint-spark-integration"))
Antlr4 / antlr4GenListener := true,
Antlr4 / antlr4GenVisitor := true,
// Assembly settings
assembly / test := {},
assembly / assemblyExcludedJars := {
val cp = (assembly / fullClasspath).value
cp.filter(_.data.getName.contains("test"))
assemblyPackageScala / assembleArtifact := false,
assembly / assemblyOption ~= {
_.withIncludeScala(false)
},
assembly / assemblyMergeStrategy := {
case PathList(ps @ _*) if ps.last endsWith ("module-info.class") =>
Expand Down Expand Up @@ -229,10 +248,13 @@ lazy val sparkSqlApplication = (project in file("spark-sql-application"))
"org.mockito" %% "mockito-scala" % "1.16.42" % "test",
"org.scalatestplus" %% "mockito-4-6" % "3.2.15.0" % "test"),
// Assembly settings
assembly / test := {},
assembly / assemblyExcludedJars := {
val cp = (assembly / fullClasspath).value
cp.filter(_.data.getName.contains("test"))
// the sbt assembly plugin found multiple copies of the module-info.class file with
// different contents in the jars that it was merging flintCore dependencies.
// This can happen if you have multiple dependencies that include the same library,
// but with different versions.
assemblyPackageScala / assembleArtifact := false,
assembly / assemblyOption ~= {
_.withIncludeScala(false)
},
assembly / assemblyMergeStrategy := {
case PathList(ps@_*) if ps.last endsWith ("module-info.class") =>
Expand All @@ -243,7 +265,9 @@ lazy val sparkSqlApplication = (project in file("spark-sql-application"))
case x =>
val oldStrategy = (assembly / assemblyMergeStrategy).value
oldStrategy(x)
})
},
assembly / test := (Test / test).value
)

lazy val sparkSqlApplicationCosmetic = project
.settings(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@

import static java.util.logging.Level.SEVERE;
import static java.util.logging.Level.WARNING;
import static org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState$;
import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;

import java.util.Objects;
import java.util.function.Function;
Expand All @@ -30,11 +27,6 @@ public class DefaultOptimisticTransaction<T> implements OptimisticTransaction<T>

private static final Logger LOG = Logger.getLogger(DefaultOptimisticTransaction.class.getName());

/**
* Data source name. TODO: remove this in future.
*/
private final String dataSourceName;

/**
* Flint metadata log
*/
Expand All @@ -44,10 +36,7 @@ public class DefaultOptimisticTransaction<T> implements OptimisticTransaction<T>
private Function<FlintMetadataLogEntry, FlintMetadataLogEntry> transientAction = null;
private Function<FlintMetadataLogEntry, FlintMetadataLogEntry> finalAction = null;

public DefaultOptimisticTransaction(
String dataSourceName,
FlintMetadataLog<FlintMetadataLogEntry> metadataLog) {
this.dataSourceName = dataSourceName;
public DefaultOptimisticTransaction(FlintMetadataLog<FlintMetadataLogEntry> metadataLog) {
this.metadataLog = metadataLog;
}

Expand Down Expand Up @@ -79,7 +68,7 @@ public T commit(Function<FlintMetadataLogEntry, T> operation) {

// Get the latest log and create if not exists
FlintMetadataLogEntry latest =
metadataLog.getLatest().orElseGet(() -> metadataLog.add(emptyLogEntry()));
metadataLog.getLatest().orElseGet(() -> metadataLog.add(metadataLog.emptyLogEntry()));

// Perform initial log check
if (!initialCondition.test(latest)) {
Expand All @@ -93,15 +82,14 @@ public T commit(Function<FlintMetadataLogEntry, T> operation) {
if (transientAction != null) {
latest = metadataLog.add(transientAction.apply(latest));

// Copy latest seqNo and primaryTerm to initialLog for potential rollback use
// Copy latest entryVersion to initialLog for potential rollback use
initialLog = initialLog.copy(
initialLog.id(),
latest.seqNo(),
latest.primaryTerm(),
initialLog.createTime(),
initialLog.state(),
initialLog.dataSource(),
initialLog.error());
latest.entryVersion(),
initialLog.error(),
initialLog.properties());
}

// Perform operation
Expand Down Expand Up @@ -129,15 +117,4 @@ public T commit(Function<FlintMetadataLogEntry, T> operation) {
throw new IllegalStateException("Failed to commit transaction operation");
}
}

private FlintMetadataLogEntry emptyLogEntry() {
return new FlintMetadataLogEntry(
"",
UNASSIGNED_SEQ_NO,
UNASSIGNED_PRIMARY_TERM,
0L,
IndexState$.MODULE$.EMPTY(),
dataSourceName,
"");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.storage;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry;

import java.util.Map;

/**
* Utility class for handling Flint metadata log entries in OpenSearch storage.
*/
public class FlintMetadataLogEntryOpenSearchConverter {

public static final String QUERY_EXECUTION_REQUEST_MAPPING = String.join("\n",
"{",
" \"dynamic\": false,",
" \"properties\": {",
" \"version\": {",
" \"type\": \"keyword\"",
" },",
" \"type\": {",
" \"type\": \"keyword\"",
" },",
" \"state\": {",
" \"type\": \"keyword\"",
" },",
" \"statementId\": {",
" \"type\": \"keyword\"",
" },",
" \"applicationId\": {",
" \"type\": \"keyword\"",
" },",
" \"sessionId\": {",
" \"type\": \"keyword\"",
" },",
" \"sessionType\": {",
" \"type\": \"keyword\"",
" },",
" \"error\": {",
" \"type\": \"text\"",
" },",
" \"lang\": {",
" \"type\": \"keyword\"",
" },",
" \"query\": {",
" \"type\": \"text\"",
" },",
" \"dataSourceName\": {",
" \"type\": \"keyword\"",
" },",
" \"submitTime\": {",
" \"type\": \"date\",",
" \"format\": \"strict_date_time||epoch_millis\"",
" },",
" \"jobId\": {",
" \"type\": \"keyword\"",
" },",
" \"lastUpdateTime\": {",
" \"type\": \"date\",",
" \"format\": \"strict_date_time||epoch_millis\"",
" },",
" \"queryId\": {",
" \"type\": \"keyword\"",
" },",
" \"excludeJobIds\": {",
" \"type\": \"keyword\"",
" }",
" }",
"}");

public static final String QUERY_EXECUTION_REQUEST_SETTINGS = String.join("\n",
"{",
" \"index\": {",
" \"number_of_shards\": \"1\",",
" \"auto_expand_replicas\": \"0-2\",",
" \"number_of_replicas\": \"0\"",
" }",
"}");

/**
* Convert a log entry to json string for persisting to OpenSearch.
* Expects the following field in properties:
* - dataSourceName: data source name
*
* @param logEntry
* log entry to convert
* @return
* json string representation of the log entry
*/
public static String toJson(FlintMetadataLogEntry logEntry) throws JsonProcessingException {
String applicationId = System.getenv().getOrDefault("SERVERLESS_EMR_VIRTUAL_CLUSTER_ID", "unknown");
String jobId = System.getenv().getOrDefault("SERVERLESS_EMR_JOB_ID", "unknown");
long lastUpdateTime = System.currentTimeMillis();

ObjectMapper mapper = new ObjectMapper();
ObjectNode json = mapper.createObjectNode();

json.put("version", "1.0");
json.put("latestId", logEntry.id());
json.put("type", "flintindexstate");
json.put("state", logEntry.state().toString());
json.put("applicationId", applicationId);
json.put("jobId", jobId);
json.put("dataSourceName", logEntry.properties().get("dataSourceName").get().toString());
json.put("jobStartTime", logEntry.createTime());
json.put("lastUpdateTime", lastUpdateTime);
json.put("error", logEntry.error());

return mapper.writerWithDefaultPrettyPrinter().writeValueAsString(json);
}

/**
* Construct a log entry from OpenSearch document fields.
*
* @param id
* OpenSearch document id
* @param seqNo
* OpenSearch document sequence number
* @param primaryTerm
* OpenSearch document primary term
* @param sourceMap
* OpenSearch document source as a map
* @return
* log entry
*/
public static FlintMetadataLogEntry constructLogEntry(
String id,
Long seqNo,
Long primaryTerm,
Map<String, Object> sourceMap) {
return new FlintMetadataLogEntry(
id,
/* sourceMap may use Integer or Long even though it's always long in index mapping */
((Number) sourceMap.get("jobStartTime")).longValue(),
FlintMetadataLogEntry.IndexState$.MODULE$.from((String) sourceMap.get("state")),
Map.of("seqNo", seqNo, "primaryTerm", primaryTerm),
(String) sourceMap.get("error"),
Map.of("dataSourceName", (String) sourceMap.get("dataSourceName")));
}
}
Loading

0 comments on commit c3bbe63

Please sign in to comment.