Skip to content

Commit

Permalink
Merge branch 'main' into improve-error-handling
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Jun 24, 2024
2 parents 1c3078c + 0f53448 commit cca8711
Show file tree
Hide file tree
Showing 51 changed files with 1,195 additions and 648 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
# OpenSearch Flint

OpenSearch Flint is ... It consists of two modules:
OpenSearch Flint is ... It consists of four modules:

- `flint-core`: a module that contains Flint specification and client.
- `flint-commons`: a module that provides a shared library of utilities and common functionalities, designed to easily extend Flint's capabilities.
- `flint-spark-integration`: a module that provides Spark integration for Flint and derived dataset based on it.
- `ppl-spark-integration`: a module that provides PPL query execution on top of Spark See [PPL repository](https://github.com/opensearch-project/piped-processing-language).

Expand Down
38 changes: 35 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,13 @@ lazy val commonSettings = Seq(

// running `scalafmtAll` includes all subprojects under root
lazy val root = (project in file("."))
.aggregate(flintCore, flintSparkIntegration, pplSparkIntegration, sparkSqlApplication, integtest)
.aggregate(flintCommons, flintCore, flintSparkIntegration, pplSparkIntegration, sparkSqlApplication, integtest)
.disablePlugins(AssemblyPlugin)
.settings(name := "flint", publish / skip := true)

lazy val flintCore = (project in file("flint-core"))
.disablePlugins(AssemblyPlugin)
.dependsOn(flintCommons)
.settings(
commonSettings,
name := "flint-core",
Expand Down Expand Up @@ -84,6 +85,37 @@ lazy val flintCore = (project in file("flint-core"))
libraryDependencies ++= deps(sparkVersion),
publish / skip := true)

lazy val flintCommons = (project in file("flint-commons"))
.settings(
commonSettings,
name := "flint-commons",
scalaVersion := scala212,
libraryDependencies ++= Seq(
"org.scalactic" %% "scalactic" % "3.2.15" % "test",
"org.scalatest" %% "scalatest" % "3.2.15" % "test",
"org.scalatest" %% "scalatest-flatspec" % "3.2.15" % "test",
"org.scalatestplus" %% "mockito-4-6" % "3.2.15.0" % "test",
),
libraryDependencies ++= deps(sparkVersion),
publish / skip := true,
assembly / test := (Test / test).value,
assembly / assemblyOption ~= {
_.withIncludeScala(false)
},
assembly / assemblyMergeStrategy := {
case PathList(ps@_*) if ps.last endsWith ("module-info.class") =>
MergeStrategy.discard
case PathList("module-info.class") => MergeStrategy.discard
case PathList("META-INF", "versions", xs@_, "module-info.class") =>
MergeStrategy.discard
case x =>
val oldStrategy = (assembly / assemblyMergeStrategy).value
oldStrategy(x)
},
)
.enablePlugins(AssemblyPlugin)


lazy val pplSparkIntegration = (project in file("ppl-spark-integration"))
.enablePlugins(AssemblyPlugin, Antlr4Plugin)
.settings(
Expand Down Expand Up @@ -121,7 +153,7 @@ lazy val pplSparkIntegration = (project in file("ppl-spark-integration"))
assembly / test := (Test / test).value)

lazy val flintSparkIntegration = (project in file("flint-spark-integration"))
.dependsOn(flintCore)
.dependsOn(flintCore, flintCommons)
.enablePlugins(AssemblyPlugin, Antlr4Plugin)
.settings(
commonSettings,
Expand Down Expand Up @@ -166,7 +198,7 @@ lazy val flintSparkIntegration = (project in file("flint-spark-integration"))

// Test assembly package with integration test.
lazy val integtest = (project in file("integ-test"))
.dependsOn(flintSparkIntegration % "test->test", pplSparkIntegration % "test->test", sparkSqlApplication % "test->test")
.dependsOn(flintCommons % "test->test", flintSparkIntegration % "test->test", pplSparkIntegration % "test->test", sparkSqlApplication % "test->test")
.settings(
commonSettings,
name := "integ-test",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.metadata.log;
package org.opensearch.flint.common.metadata.log;

import java.util.Optional;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.metadata.log
package org.opensearch.flint.common.metadata.log

import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.IndexState
import org.opensearch.index.seqno.SequenceNumbers.{UNASSIGNED_PRIMARY_TERM, UNASSIGNED_SEQ_NO}
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState.IndexState

/**
* Flint metadata log entry. This is temporary and will merge field in FlintMetadata here and move
Expand Down Expand Up @@ -161,14 +160,4 @@ object FlintMetadataLogEntry {
| "number_of_replicas": "0"
| }
|}""".stripMargin

def failLogEntry(dataSourceName: String, error: String): FlintMetadataLogEntry =
FlintMetadataLogEntry(
"",
UNASSIGNED_SEQ_NO,
UNASSIGNED_PRIMARY_TERM,
0L,
IndexState.FAILED,
dataSourceName,
error)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.common.metadata.log;

import java.util.Optional;

/**
* Flint metadata log service provides API for metadata log related operations on a Flint index
* regardless of underlying storage.
* <p>
* Custom implementations of this interface are expected to provide a public constructor with
* the signature {@code public MyCustomService(SparkConf sparkConf)} to be instantiated by
* the FlintMetadataLogServiceBuilder.
*/
public interface FlintMetadataLogService {

/**
* Start a new optimistic transaction.
*
* @param indexName index name
* @param forceInit force init transaction and create empty metadata log if not exist
* @return transaction handle
*/
<T> OptimisticTransaction<T> startTransaction(String indexName, boolean forceInit);

/**
* Start a new optimistic transaction.
*
* @param indexName index name
* @return transaction handle
*/
default <T> OptimisticTransaction<T> startTransaction(String indexName) {
return startTransaction(indexName, false);
}

/**
* Get metadata log for index.
*
* @param indexName index name
* @return optional metadata log
*/
Optional<FlintMetadataLog<FlintMetadataLogEntry>> getIndexMetadataLog(String indexName);

/**
* Record heartbeat timestamp for index streaming job.
*
* @param indexName index name
*/
void recordHeartbeat(String indexName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.metadata.log;
package org.opensearch.flint.common.metadata.log;

import java.util.function.Function;
import java.util.function.Predicate;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.data

/**
* Provides a mutable map to store and retrieve contextual data using key-value pairs.
*/
trait ContextualDataStore {

/** Holds the contextual data as key-value pairs. */
var context: Map[String, Any] = Map.empty

/**
* Adds a key-value pair to the context map.
*/
def setContextValue(key: String, value: Any): Unit = {
context += (key -> value)
}

/**
* Retrieves the value associated with a key from the context map.
*/
def getContextValue(key: String): Option[Any] = {
context.get(key)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.data

import org.json4s.{Formats, NoTypeHints}
import org.json4s.JsonAST.JString
import org.json4s.native.JsonMethods.parse
import org.json4s.native.Serialization

object StatementStates {
val RUNNING = "running"
val SUCCESS = "success"
val FAILED = "failed"
val WAITING = "waiting"
}

/**
* Represents a statement processed in the Flint job.
*
* @param state
* The current state of the statement.
* @param query
* SQL-like query string that the statement will execute.
* @param statementId
* Unique identifier for the type of statement.
* @param queryId
* Unique identifier for the query.
* @param submitTime
* Timestamp when the statement was submitted.
* @param error
* Optional error message if the statement fails.
* @param statementContext
* Additional context for the statement as key-value pairs.
*/
class FlintStatement(
var state: String,
val query: String,
// statementId is the statement type doc id
val statementId: String,
val queryId: String,
val submitTime: Long,
var error: Option[String] = None,
statementContext: Map[String, Any] = Map.empty[String, Any])
extends ContextualDataStore {
context = statementContext

def running(): Unit = state = StatementStates.RUNNING
def complete(): Unit = state = StatementStates.SUCCESS
def fail(): Unit = state = StatementStates.FAILED
def isRunning: Boolean = state == StatementStates.RUNNING
def isComplete: Boolean = state == StatementStates.SUCCESS
def isFailed: Boolean = state == StatementStates.FAILED
def isWaiting: Boolean = state == StatementStates.WAITING

// Does not include context, which could contain sensitive information.
override def toString: String =
s"FlintStatement(state=$state, query=$query, statementId=$statementId, queryId=$queryId, submitTime=$submitTime, error=$error)"
}

object FlintStatement {

implicit val formats: Formats = Serialization.formats(NoTypeHints)

def deserialize(statement: String): FlintStatement = {
val meta = parse(statement)
val state = (meta \ "state").extract[String]
val query = (meta \ "query").extract[String]
val statementId = (meta \ "statementId").extract[String]
val queryId = (meta \ "queryId").extract[String]
val submitTime = (meta \ "submitTime").extract[Long]
val maybeError: Option[String] = (meta \ "error") match {
case JString(str) => Some(str)
case _ => None
}

new FlintStatement(state, query, statementId, queryId, submitTime, maybeError)
}

def serialize(flintStatement: FlintStatement): String = {
// we only need to modify state and error
Serialization.write(
Map("state" -> flintStatement.state, "error" -> flintStatement.error.getOrElse("")))
}
}
Loading

0 comments on commit cca8711

Please sign in to comment.