Skip to content

Commit

Permalink
Make states always in upper case
Browse files Browse the repository at this point in the history
  • Loading branch information
noCharger committed Aug 8, 2024
1 parent e11abc3 commit f258018
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,19 @@

package org.opensearch.flint.common.model

import java.util.Locale

import org.json4s.{Formats, NoTypeHints}
import org.json4s.JsonAST.JString
import org.json4s.native.JsonMethods.parse
import org.json4s.native.Serialization

object StatementStates {
val RUNNING = "running"
val SUCCESS = "success"
val FAILED = "failed"
val TIMEOUT = "timeout"
val WAITING = "waiting"
val RUNNING = "RUNNING"
val SUCCESS = "SUCCESS"
val FAILED = "FAILED"
val TIMEOUT = "TIMEOUT"
val WAITING = "WAITING"
}

/**
Expand Down Expand Up @@ -53,10 +55,10 @@ class FlintStatement(
def fail(): Unit = state = StatementStates.FAILED
def timeout(): Unit = state = StatementStates.TIMEOUT

def isRunning: Boolean = state == StatementStates.RUNNING
def isComplete: Boolean = state == StatementStates.SUCCESS
def isFailed: Boolean = state == StatementStates.FAILED
def isWaiting: Boolean = state == StatementStates.WAITING
def isRunning: Boolean = state.equalsIgnoreCase(StatementStates.RUNNING)
def isComplete: Boolean = state.equalsIgnoreCase(StatementStates.SUCCESS)
def isFailed: Boolean = state.equalsIgnoreCase(StatementStates.FAILED)
def isWaiting: Boolean = state.equalsIgnoreCase(StatementStates.WAITING)

// Does not include context, which could contain sensitive information.
override def toString: String =
Expand All @@ -69,7 +71,7 @@ object FlintStatement {

def deserialize(statement: String): FlintStatement = {
val meta = parse(statement)
val state = (meta \ "state").extract[String]
val state = (meta \ "state").extract[String].toUpperCase(Locale.ROOT)
val query = (meta \ "query").extract[String]
val statementId = (meta \ "statementId").extract[String]
val queryId = (meta \ "queryId").extract[String]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package org.opensearch.flint.common.model

import java.util.{Map => JavaMap}
import java.util.{Locale, Map => JavaMap}

import scala.collection.JavaConverters._

Expand All @@ -15,9 +15,9 @@ import org.json4s.native.JsonMethods.parse
import org.json4s.native.Serialization

object SessionStates {
val RUNNING = "running"
val DEAD = "dead"
val FAIL = "fail"
val RUNNING = "RUNNING"
val DEAD = "DEAD"
val FAIL = "FAIL"
}

/**
Expand Down Expand Up @@ -59,9 +59,9 @@ class InteractiveSession(
def complete(): Unit = state = SessionStates.DEAD
def fail(): Unit = state = SessionStates.FAIL

def isRunning: Boolean = state == SessionStates.RUNNING
def isComplete: Boolean = state == SessionStates.DEAD
def isFail: Boolean = state == SessionStates.FAIL
def isRunning: Boolean = state.equalsIgnoreCase(SessionStates.RUNNING)
def isComplete: Boolean = state.equalsIgnoreCase(SessionStates.DEAD)
def isFail: Boolean = state.equalsIgnoreCase(SessionStates.FAIL)

override def toString: String = {
val excludedJobIdsStr = excludedJobIds.mkString("[", ", ", "]")
Expand All @@ -79,7 +79,7 @@ object InteractiveSession {
def deserialize(job: String): InteractiveSession = {
val meta = parse(job)
val applicationId = (meta \ "applicationId").extract[String]
val state = (meta \ "state").extract[String]
val state = (meta \ "state").extract[String].toUpperCase(Locale.ROOT)
val jobId = (meta \ "jobId").extract[String]
val sessionId = (meta \ "sessionId").extract[String]
val lastUpdateTime = (meta \ "lastUpdateTime").extract[Long]
Expand Down Expand Up @@ -118,7 +118,7 @@ object InteractiveSession {
val scalaSource = source.asScala

val applicationId = scalaSource("applicationId").asInstanceOf[String]
val state = scalaSource("state").asInstanceOf[String]
val state = scalaSource("state").asInstanceOf[String].toUpperCase(Locale.ROOT)
val jobId = scalaSource("jobId").asInstanceOf[String]
val sessionId = scalaSource("sessionId").asInstanceOf[String]
val lastUpdateTime = scalaSource("lastUpdateTime").asInstanceOf[Long]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class InteractiveSessionTest extends SparkFunSuite with Matchers {

test("deserialize should correctly parse a FlintInstance with excludedJobIds from JSON") {
val json =
"""{"applicationId":"app-123","jobId":"job-456","sessionId":"session-789","state":"RUNNING","lastUpdateTime":1620000000000,"jobStartTime":1620000001000,"excludeJobIds":["job-101","job-202"]}"""
"""{"applicationId":"app-123","jobId":"job-456","sessionId":"session-789","state":"running","lastUpdateTime":1620000000000,"jobStartTime":1620000001000,"excludeJobIds":["job-101","job-202"]}"""
val instance = InteractiveSession.deserialize(json)

instance.applicationId shouldBe "app-123"
Expand Down Expand Up @@ -101,7 +101,7 @@ class InteractiveSessionTest extends SparkFunSuite with Matchers {
assert(result.applicationId == "app1")
assert(result.jobId == "job1")
assert(result.sessionId == "session1")
assert(result.state == "running")
assert(result.state == "RUNNING")
assert(result.lastUpdateTime == 1234567890L)
assert(result.jobStartTime == 9876543210L)
assert(result.excludedJobIds == Seq("job2", "job3"))
Expand Down Expand Up @@ -134,7 +134,7 @@ class InteractiveSessionTest extends SparkFunSuite with Matchers {
assert(result.applicationId == "app1")
assert(result.jobId == "job1")
assert(result.sessionId == "session1")
assert(result.state == "running")
assert(result.state == "RUNNING")
assert(result.lastUpdateTime == 1234567890L)
assert(result.jobStartTime == 0L) // Default value for missing jobStartTime
assert(result.excludedJobIds == Seq("job2", "job3"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ class FlintREPLITSuite extends SparkFunSuite with OpenSearchSuite with JobTest {
assert(
!awaitConditionForStatementOrTimeout(
statement => {
statement.state == "success"
statement.isComplete
},
selectStatementId),
s"Fail to verify for $selectStatementId.")
Expand Down Expand Up @@ -344,7 +344,7 @@ class FlintREPLITSuite extends SparkFunSuite with OpenSearchSuite with JobTest {
assert(
!awaitConditionForStatementOrTimeout(
statement => {
statement.state == "success"
statement.isComplete
},
descStatementId),
s"Fail to verify for $descStatementId.")
Expand Down Expand Up @@ -381,7 +381,7 @@ class FlintREPLITSuite extends SparkFunSuite with OpenSearchSuite with JobTest {
assert(
!awaitConditionForStatementOrTimeout(
statement => {
statement.state == "success"
statement.isComplete
},
showTableStatementId),
s"Fail to verify for $showTableStatementId.")
Expand All @@ -401,7 +401,7 @@ class FlintREPLITSuite extends SparkFunSuite with OpenSearchSuite with JobTest {
assert(
!awaitConditionForStatementOrTimeout(
statement => {
statement.state == "failed"
statement.isFailed
},
wrongSelectStatementId),
s"Fail to verify for $wrongSelectStatementId.")
Expand All @@ -410,7 +410,7 @@ class FlintREPLITSuite extends SparkFunSuite with OpenSearchSuite with JobTest {
assert(
awaitConditionForStatementOrTimeout(
statement => {
statement.state != "waiting"
!statement.isWaiting
},
lateSelectStatementId),
s"Fail to verify for $lateSelectStatementId.")
Expand Down Expand Up @@ -471,7 +471,7 @@ class FlintREPLITSuite extends SparkFunSuite with OpenSearchSuite with JobTest {
case _ =>
fail(s"Statement error is: ${statement.error}")
}
statement.state == "failed"
statement.isFailed
},
createTableStatementId),
s"Fail to verify for $createTableStatementId.")
Expand Down Expand Up @@ -558,15 +558,15 @@ class FlintREPLITSuite extends SparkFunSuite with OpenSearchSuite with JobTest {
assert(
!awaitConditionForStatementOrTimeout(
statement => {
statement.state == "success"
statement.isComplete
},
selectStatementId),
s"Fail to verify for $selectStatementId.")

assert(
awaitConditionForStatementOrTimeout(
statement => {
statement.state != "waiting"
!statement.isWaiting
},
lateSelectStatementId),
s"Fail to verify for $lateSelectStatementId.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,7 @@ class FlintREPLTest

verify(mockSparkSession, times(1)).sql(any[String])
verify(sparkContext, times(1)).cancelJobGroup(any[String])
assert("timeout" == flintStatement.state)
assert("TIMEOUT" == flintStatement.state)
assert(s"Executing ${flintStatement.query} timed out" == flintStatement.error.get)
result should not be None
} finally threadPool.shutdown()
Expand Down

0 comments on commit f258018

Please sign in to comment.