Skip to content

Commit

Permalink
[Refactor] Introduce flint-data for model and interface
Browse files Browse the repository at this point in the history
Signed-off-by: Louis Chu <[email protected]>
  • Loading branch information
noCharger committed Jun 10, 2024
1 parent 689788a commit 2f5e715
Show file tree
Hide file tree
Showing 9 changed files with 229 additions and 92 deletions.
37 changes: 34 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ lazy val commonSettings = Seq(

// running `scalafmtAll` includes all subprojects under root
lazy val root = (project in file("."))
.aggregate(flintCore, flintSparkIntegration, pplSparkIntegration, sparkSqlApplication, integtest)
.aggregate(flintData, flintCore, flintSparkIntegration, pplSparkIntegration, sparkSqlApplication, integtest)
.disablePlugins(AssemblyPlugin)
.settings(name := "flint", publish / skip := true)

Expand Down Expand Up @@ -84,6 +84,37 @@ lazy val flintCore = (project in file("flint-core"))
libraryDependencies ++= deps(sparkVersion),
publish / skip := true)

lazy val flintData = (project in file("flint-data"))
.settings(
commonSettings,
name := "flint-data",
scalaVersion := scala212,
libraryDependencies ++= Seq(
"org.scalactic" %% "scalactic" % "3.2.15" % "test",
"org.scalatest" %% "scalatest" % "3.2.15" % "test",
"org.scalatest" %% "scalatest-flatspec" % "3.2.15" % "test",
"org.scalatestplus" %% "mockito-4-6" % "3.2.15.0" % "test",
),
libraryDependencies ++= deps(sparkVersion),
publish / skip := true,
assembly / test := (Test / test).value,
assembly / assemblyOption ~= {
_.withIncludeScala(false)
},
assembly / assemblyMergeStrategy := {
case PathList(ps@_*) if ps.last endsWith ("module-info.class") =>
MergeStrategy.discard
case PathList("module-info.class") => MergeStrategy.discard
case PathList("META-INF", "versions", xs@_, "module-info.class") =>
MergeStrategy.discard
case x =>
val oldStrategy = (assembly / assemblyMergeStrategy).value
oldStrategy(x)
},
)
.enablePlugins(AssemblyPlugin)


lazy val pplSparkIntegration = (project in file("ppl-spark-integration"))
.enablePlugins(AssemblyPlugin, Antlr4Plugin)
.settings(
Expand Down Expand Up @@ -121,7 +152,7 @@ lazy val pplSparkIntegration = (project in file("ppl-spark-integration"))
assembly / test := (Test / test).value)

lazy val flintSparkIntegration = (project in file("flint-spark-integration"))
.dependsOn(flintCore)
.dependsOn(flintCore, flintData)
.enablePlugins(AssemblyPlugin, Antlr4Plugin)
.settings(
commonSettings,
Expand Down Expand Up @@ -166,7 +197,7 @@ lazy val flintSparkIntegration = (project in file("flint-spark-integration"))

// Test assembly package with integration test.
lazy val integtest = (project in file("integ-test"))
.dependsOn(flintSparkIntegration % "test->test", pplSparkIntegration % "test->test", sparkSqlApplication % "test->test")
.dependsOn(flintData % "test->test", flintSparkIntegration % "test->test", pplSparkIntegration % "test->test", sparkSqlApplication % "test->test")
.settings(
commonSettings,
name := "integ-test",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.data

/**
* Provides a mutable map to store and retrieve contextual data using key-value pairs.
*/
trait ContextualData {

/** Holds the contextual data as key-value pairs. */
var context: Map[String, Any] = Map.empty

/**
* Adds a key-value pair to the context map.
*
* @param key
* The key under which the value is stored.
* @param value
* The data value to store.
*/
def addContext(key: String, value: Any): Unit = {
context += (key -> value)
}

/**
* Retrieves the value associated with a key from the context map.
*
* @param key
* The key whose value needs to be retrieved.
* @return
* An option containing the value if it exists, None otherwise.
*/
def getContext(key: String): Option[Any] = {
context.get(key)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,52 +3,61 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.app
package org.opensearch.flint.data

import org.json4s.{Formats, NoTypeHints}
import org.json4s.JsonAST.JString
import org.json4s.native.JsonMethods.parse
import org.json4s.native.Serialization

object CommandStates {
val Running = "running"
val Success = "success"
val Failed = "failed"
val Waiting = "waiting"
}

/**
* Represents a command processed in the Flint job.
*
* @param state
* The current state of the command.
* @param query
* SQL-like query string that the command will execute.
* @param statementId
* Unique identifier for the type of statement.
* @param queryId
* Unique identifier for the query.
* @param submitTime
* Timestamp when the command was submitted.
* @param error
* Optional error message if the command fails.
* @param commandContext
* Additional context for the command as key-value pairs.
*/
class FlintCommand(
var state: String,
val query: String,
// statementId is the statement type doc id
val statementId: String,
val queryId: String,
val submitTime: Long,
var error: Option[String] = None) {
def running(): Unit = {
state = "running"
}

def complete(): Unit = {
state = "success"
}

def fail(): Unit = {
state = "failed"
}
var error: Option[String] = None,
commandContext: Map[String, Any] = Map.empty[String, Any])
extends ContextualData {
context = commandContext

def isRunning(): Boolean = {
state == "running"
}

def isComplete(): Boolean = {
state == "success"
}

def isFailed(): Boolean = {
state == "failed"
}

def isWaiting(): Boolean = {
state == "waiting"
}
def running(): Unit = state = CommandStates.Running
def complete(): Unit = state = CommandStates.Success
def fail(): Unit = state = CommandStates.Failed
def isRunning: Boolean = state == CommandStates.Running
def isComplete: Boolean = state == CommandStates.Success
def isFailed: Boolean = state == CommandStates.Failed
def isWaiting: Boolean = state == CommandStates.Waiting

override def toString: String = {
// Does not include context, which could contain sensitive information.
override def toString: String =
s"FlintCommand(state=$state, query=$query, statementId=$statementId, queryId=$queryId, submitTime=$submitTime, error=$error)"
}
}

object FlintCommand {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,42 +3,78 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.app
package org.opensearch.flint.data

import java.util.{Map => JavaMap}

import scala.collection.JavaConverters._
import scala.collection.mutable

import org.json4s.{Formats, JNothing, JNull, NoTypeHints}
import org.json4s.JsonAST.{JArray, JString}
import org.json4s.native.JsonMethods.parse
import org.json4s.native.Serialization

// lastUpdateTime is added to FlintInstance to track the last update time of the instance. Its unit is millisecond.
class FlintInstance(
object SessionStates {
val Running = "running"
val Complete = "complete"
val Failed = "failed"
val Waiting = "waiting"
}

/**
* Represents an interactive session for job and state management.
*
* @param applicationId
* Unique identifier for the EMR-S application.
* @param jobId
* Identifier for the specific EMR-S job.
* @param sessionId
* Unique session identifier.
* @param state
* Current state of the session.
* @param lastUpdateTime
* Timestamp of the last update.
* @param jobStartTime
* Start time of the job.
* @param excludedJobIds
* List of job IDs that are excluded.
* @param error
* Optional error message.
* @param sessionContext
* Additional context for the session.
*/
class InteractiveSession(
val applicationId: String,
val jobId: String,
// sessionId is the session type doc id
val sessionId: String,
var state: String,
val lastUpdateTime: Long,
val jobStartTime: Long = 0,
val excludedJobIds: Seq[String] = Seq.empty[String],
val error: Option[String] = None) {
val error: Option[String] = None,
sessionContext: Map[String, Any] = Map.empty[String, Any])
extends ContextualData {
context = sessionContext // Initialize the context from the constructor

def isRunning: Boolean = state == SessionStates.Running
def isComplete: Boolean = state == SessionStates.Complete
def isFailed: Boolean = state == SessionStates.Failed
def isWaiting: Boolean = state == SessionStates.Waiting

override def toString: String = {
val excludedJobIdsStr = excludedJobIds.mkString("[", ", ", "]")
val errorStr = error.getOrElse("None")
// Does not include context, which could contain sensitive information.
s"FlintInstance(applicationId=$applicationId, jobId=$jobId, sessionId=$sessionId, state=$state, " +
s"lastUpdateTime=$lastUpdateTime, jobStartTime=$jobStartTime, excludedJobIds=$excludedJobIdsStr, error=$errorStr)"
}
}

object FlintInstance {
object InteractiveSession {

implicit val formats: Formats = Serialization.formats(NoTypeHints)

def deserialize(job: String): FlintInstance = {
def deserialize(job: String): InteractiveSession = {
val meta = parse(job)
val applicationId = (meta \ "applicationId").extract[String]
val state = (meta \ "state").extract[String]
Expand All @@ -64,7 +100,7 @@ object FlintInstance {
case _ => None
}

new FlintInstance(
new InteractiveSession(
applicationId,
jobId,
sessionId,
Expand All @@ -75,7 +111,7 @@ object FlintInstance {
maybeError)
}

def deserializeFromMap(source: JavaMap[String, AnyRef]): FlintInstance = {
def deserializeFromMap(source: JavaMap[String, AnyRef]): InteractiveSession = {
// Since we are dealing with JavaMap, we convert it to a Scala mutable Map for ease of use.
val scalaSource = source.asScala

Expand Down Expand Up @@ -105,7 +141,7 @@ object FlintInstance {
}

// Construct a new FlintInstance with the extracted values.
new FlintInstance(
new InteractiveSession(
applicationId,
jobId,
sessionId,
Expand Down Expand Up @@ -133,7 +169,10 @@ object FlintInstance {
* @return
* serialized Flint session
*/
def serialize(job: FlintInstance, currentTime: Long, includeJobId: Boolean = true): String = {
def serialize(
job: InteractiveSession,
currentTime: Long,
includeJobId: Boolean = true): String = {
val baseMap = Map(
"type" -> "session",
"sessionId" -> job.sessionId,
Expand All @@ -159,7 +198,7 @@ object FlintInstance {
Serialization.write(resultMap)
}

def serializeWithoutJobId(job: FlintInstance, currentTime: Long): String = {
def serializeWithoutJobId(job: InteractiveSession, currentTime: Long): String = {
serialize(job, currentTime, includeJobId = false)
}
}
Loading

0 comments on commit 2f5e715

Please sign in to comment.