Skip to content

Commit

Permalink
[Refactor] Introduce flint-commons for models and interfaces (#373)
Browse files Browse the repository at this point in the history
* [Refactor] Introduce flint-data for model and interface

Signed-off-by: Louis Chu <[email protected]>

* Uppercase for constant

Signed-off-by: Louis Chu <[email protected]>

* Address comments

Signed-off-by: Louis Chu <[email protected]>

* Rename package

Signed-off-by: Louis Chu <[email protected]>

---------

Signed-off-by: Louis Chu <[email protected]>
  • Loading branch information
noCharger authored Jun 12, 2024
1 parent 385a344 commit 80d8f6e
Show file tree
Hide file tree
Showing 11 changed files with 346 additions and 218 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
# OpenSearch Flint

OpenSearch Flint is ... It consists of two modules:
OpenSearch Flint is ... It consists of four modules:

- `flint-core`: a module that contains Flint specification and client.
- `flint-commons`: a module that provides a shared library of utilities and common functionalities, designed to easily extend Flint's capabilities.
- `flint-spark-integration`: a module that provides Spark integration for Flint and derived dataset based on it.
- `ppl-spark-integration`: a module that provides PPL query execution on top of Spark See [PPL repository](https://github.com/opensearch-project/piped-processing-language).

Expand Down
37 changes: 34 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ lazy val commonSettings = Seq(

// running `scalafmtAll` includes all subprojects under root
lazy val root = (project in file("."))
.aggregate(flintCore, flintSparkIntegration, pplSparkIntegration, sparkSqlApplication, integtest)
.aggregate(flintCommons, flintCore, flintSparkIntegration, pplSparkIntegration, sparkSqlApplication, integtest)
.disablePlugins(AssemblyPlugin)
.settings(name := "flint", publish / skip := true)

Expand Down Expand Up @@ -84,6 +84,37 @@ lazy val flintCore = (project in file("flint-core"))
libraryDependencies ++= deps(sparkVersion),
publish / skip := true)

lazy val flintCommons = (project in file("flint-commons"))
.settings(
commonSettings,
name := "flint-commons",
scalaVersion := scala212,
libraryDependencies ++= Seq(
"org.scalactic" %% "scalactic" % "3.2.15" % "test",
"org.scalatest" %% "scalatest" % "3.2.15" % "test",
"org.scalatest" %% "scalatest-flatspec" % "3.2.15" % "test",
"org.scalatestplus" %% "mockito-4-6" % "3.2.15.0" % "test",
),
libraryDependencies ++= deps(sparkVersion),
publish / skip := true,
assembly / test := (Test / test).value,
assembly / assemblyOption ~= {
_.withIncludeScala(false)
},
assembly / assemblyMergeStrategy := {
case PathList(ps@_*) if ps.last endsWith ("module-info.class") =>
MergeStrategy.discard
case PathList("module-info.class") => MergeStrategy.discard
case PathList("META-INF", "versions", xs@_, "module-info.class") =>
MergeStrategy.discard
case x =>
val oldStrategy = (assembly / assemblyMergeStrategy).value
oldStrategy(x)
},
)
.enablePlugins(AssemblyPlugin)


lazy val pplSparkIntegration = (project in file("ppl-spark-integration"))
.enablePlugins(AssemblyPlugin, Antlr4Plugin)
.settings(
Expand Down Expand Up @@ -121,7 +152,7 @@ lazy val pplSparkIntegration = (project in file("ppl-spark-integration"))
assembly / test := (Test / test).value)

lazy val flintSparkIntegration = (project in file("flint-spark-integration"))
.dependsOn(flintCore)
.dependsOn(flintCore, flintCommons)
.enablePlugins(AssemblyPlugin, Antlr4Plugin)
.settings(
commonSettings,
Expand Down Expand Up @@ -166,7 +197,7 @@ lazy val flintSparkIntegration = (project in file("flint-spark-integration"))

// Test assembly package with integration test.
lazy val integtest = (project in file("integ-test"))
.dependsOn(flintSparkIntegration % "test->test", pplSparkIntegration % "test->test", sparkSqlApplication % "test->test")
.dependsOn(flintCommons % "test->test", flintSparkIntegration % "test->test", pplSparkIntegration % "test->test", sparkSqlApplication % "test->test")
.settings(
commonSettings,
name := "integ-test",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.data

/**
* Provides a mutable map to store and retrieve contextual data using key-value pairs.
*/
trait ContextualDataStore {

/** Holds the contextual data as key-value pairs. */
var context: Map[String, Any] = Map.empty

/**
* Adds a key-value pair to the context map.
*/
def setContextValue(key: String, value: Any): Unit = {
context += (key -> value)
}

/**
* Retrieves the value associated with a key from the context map.
*/
def getContextValue(key: String): Option[Any] = {
context.get(key)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.data

import org.json4s.{Formats, NoTypeHints}
import org.json4s.JsonAST.JString
import org.json4s.native.JsonMethods.parse
import org.json4s.native.Serialization

object StatementStates {
val RUNNING = "running"
val SUCCESS = "success"
val FAILED = "failed"
val WAITING = "waiting"
}

/**
* Represents a statement processed in the Flint job.
*
* @param state
* The current state of the statement.
* @param query
* SQL-like query string that the statement will execute.
* @param statementId
* Unique identifier for the type of statement.
* @param queryId
* Unique identifier for the query.
* @param submitTime
* Timestamp when the statement was submitted.
* @param error
* Optional error message if the statement fails.
* @param statementContext
* Additional context for the statement as key-value pairs.
*/
class FlintStatement(
var state: String,
val query: String,
// statementId is the statement type doc id
val statementId: String,
val queryId: String,
val submitTime: Long,
var error: Option[String] = None,
statementContext: Map[String, Any] = Map.empty[String, Any])
extends ContextualDataStore {
context = statementContext

def running(): Unit = state = StatementStates.RUNNING
def complete(): Unit = state = StatementStates.SUCCESS
def fail(): Unit = state = StatementStates.FAILED
def isRunning: Boolean = state == StatementStates.RUNNING
def isComplete: Boolean = state == StatementStates.SUCCESS
def isFailed: Boolean = state == StatementStates.FAILED
def isWaiting: Boolean = state == StatementStates.WAITING

// Does not include context, which could contain sensitive information.
override def toString: String =
s"FlintStatement(state=$state, query=$query, statementId=$statementId, queryId=$queryId, submitTime=$submitTime, error=$error)"
}

object FlintStatement {

implicit val formats: Formats = Serialization.formats(NoTypeHints)

def deserialize(statement: String): FlintStatement = {
val meta = parse(statement)
val state = (meta \ "state").extract[String]
val query = (meta \ "query").extract[String]
val statementId = (meta \ "statementId").extract[String]
val queryId = (meta \ "queryId").extract[String]
val submitTime = (meta \ "submitTime").extract[Long]
val maybeError: Option[String] = (meta \ "error") match {
case JString(str) => Some(str)
case _ => None
}

new FlintStatement(state, query, statementId, queryId, submitTime, maybeError)
}

def serialize(flintStatement: FlintStatement): String = {
// we only need to modify state and error
Serialization.write(
Map("state" -> flintStatement.state, "error" -> flintStatement.error.getOrElse("")))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,42 +3,78 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.app
package org.opensearch.flint.data

import java.util.{Map => JavaMap}

import scala.collection.JavaConverters._
import scala.collection.mutable

import org.json4s.{Formats, JNothing, JNull, NoTypeHints}
import org.json4s.JsonAST.{JArray, JString}
import org.json4s.native.JsonMethods.parse
import org.json4s.native.Serialization

// lastUpdateTime is added to FlintInstance to track the last update time of the instance. Its unit is millisecond.
class FlintInstance(
object SessionStates {
val RUNNING = "running"
val COMPLETE = "complete"
val FAILED = "failed"
val WAITING = "waiting"
}

/**
* Represents an interactive session for job and state management.
*
* @param applicationId
* Unique identifier for the EMR-S application.
* @param jobId
* Identifier for the specific EMR-S job.
* @param sessionId
* Unique session identifier.
* @param state
* Current state of the session.
* @param lastUpdateTime
* Timestamp of the last update.
* @param jobStartTime
* Start time of the job.
* @param excludedJobIds
* List of job IDs that are excluded.
* @param error
* Optional error message.
* @param sessionContext
* Additional context for the session.
*/
class InteractiveSession(
val applicationId: String,
val jobId: String,
// sessionId is the session type doc id
val sessionId: String,
var state: String,
val lastUpdateTime: Long,
val jobStartTime: Long = 0,
val excludedJobIds: Seq[String] = Seq.empty[String],
val error: Option[String] = None) {
val error: Option[String] = None,
sessionContext: Map[String, Any] = Map.empty[String, Any])
extends ContextualDataStore {
context = sessionContext // Initialize the context from the constructor

def isRunning: Boolean = state == SessionStates.RUNNING
def isComplete: Boolean = state == SessionStates.COMPLETE
def isFailed: Boolean = state == SessionStates.FAILED
def isWaiting: Boolean = state == SessionStates.WAITING

override def toString: String = {
val excludedJobIdsStr = excludedJobIds.mkString("[", ", ", "]")
val errorStr = error.getOrElse("None")
// Does not include context, which could contain sensitive information.
s"FlintInstance(applicationId=$applicationId, jobId=$jobId, sessionId=$sessionId, state=$state, " +
s"lastUpdateTime=$lastUpdateTime, jobStartTime=$jobStartTime, excludedJobIds=$excludedJobIdsStr, error=$errorStr)"
}
}

object FlintInstance {
object InteractiveSession {

implicit val formats: Formats = Serialization.formats(NoTypeHints)

def deserialize(job: String): FlintInstance = {
def deserialize(job: String): InteractiveSession = {
val meta = parse(job)
val applicationId = (meta \ "applicationId").extract[String]
val state = (meta \ "state").extract[String]
Expand All @@ -64,7 +100,7 @@ object FlintInstance {
case _ => None
}

new FlintInstance(
new InteractiveSession(
applicationId,
jobId,
sessionId,
Expand All @@ -75,7 +111,7 @@ object FlintInstance {
maybeError)
}

def deserializeFromMap(source: JavaMap[String, AnyRef]): FlintInstance = {
def deserializeFromMap(source: JavaMap[String, AnyRef]): InteractiveSession = {
// Since we are dealing with JavaMap, we convert it to a Scala mutable Map for ease of use.
val scalaSource = source.asScala

Expand Down Expand Up @@ -105,7 +141,7 @@ object FlintInstance {
}

// Construct a new FlintInstance with the extracted values.
new FlintInstance(
new InteractiveSession(
applicationId,
jobId,
sessionId,
Expand Down Expand Up @@ -133,7 +169,10 @@ object FlintInstance {
* @return
* serialized Flint session
*/
def serialize(job: FlintInstance, currentTime: Long, includeJobId: Boolean = true): String = {
def serialize(
job: InteractiveSession,
currentTime: Long,
includeJobId: Boolean = true): String = {
val baseMap = Map(
"type" -> "session",
"sessionId" -> job.sessionId,
Expand All @@ -159,7 +198,7 @@ object FlintInstance {
Serialization.write(resultMap)
}

def serializeWithoutJobId(job: FlintInstance, currentTime: Long): String = {
def serializeWithoutJobId(job: InteractiveSession, currentTime: Long): String = {
serialize(job, currentTime, includeJobId = false)
}
}
Loading

0 comments on commit 80d8f6e

Please sign in to comment.