Skip to content

Commit

Permalink
Handle Missing jobStartTime in JSON Deserialization
Browse files Browse the repository at this point in the history
- Added handling for scenarios where jobStartTime is not present in the JSON input.
- Ensures FlintInstance deserialization remains robust and error-free even when jobStartTime is missing.

Testing:
  1. Extended unit tests to cover the new case.
  2. Conducted manual sanity tests to ensure stability and correctness.
Signed-off-by: Kaituo Li <[email protected]>
  • Loading branch information
kaituo committed Nov 13, 2023
1 parent 1f31e56 commit 2e45e27
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import java.util.{Map => JavaMap}
import scala.collection.JavaConverters._
import scala.collection.mutable

import org.json4s.{Formats, NoTypeHints}
import org.json4s.{Formats, JNothing, JNull, NoTypeHints}
import org.json4s.JsonAST.{JArray, JString}
import org.json4s.native.JsonMethods.parse
import org.json4s.native.Serialization
Expand All @@ -24,7 +24,7 @@ class FlintInstance(
var state: String,
val lastUpdateTime: Long,
// We need jobStartTime to check if HMAC token is expired or not
val jobStartTime: Long,
val jobStartTime: Long = 0,
val excludedJobIds: Seq[String] = Seq.empty[String],
val error: Option[String] = None) {}

Expand All @@ -39,7 +39,10 @@ object FlintInstance {
val jobId = (meta \ "jobId").extract[String]
val sessionId = (meta \ "sessionId").extract[String]
val lastUpdateTime = (meta \ "lastUpdateTime").extract[Long]
val jobStartTime = (meta \ "jobStartTime").extract[Long]
val jobStartTime: Long = meta \ "jobStartTime" match {
case JNothing | JNull => 0L // Default value for missing or null jobStartTime
case value => value.extract[Long]
}
// To handle the possibility of excludeJobIds not being present,
// we use extractOpt which gives us an Option[Seq[String]].
// If it is not present, it will return None, which we can then
Expand Down Expand Up @@ -75,7 +78,13 @@ object FlintInstance {
val jobId = scalaSource("jobId").asInstanceOf[String]
val sessionId = scalaSource("sessionId").asInstanceOf[String]
val lastUpdateTime = scalaSource("lastUpdateTime").asInstanceOf[Long]
val jobStartTime = scalaSource("jobStartTime").asInstanceOf[Long]
// Safely extract 'jobStartTime' considering potential null or absence
// Safely extract 'jobStartTime' considering potential null or absence
val jobStartTime: Long = scalaSource.get("jobStartTime") match {
case Some(value: java.lang.Long) =>
value.longValue() // Convert java.lang.Long to Scala Long
case _ => 0L // Default value if 'jobStartTime' is null or not present
}

// We safely handle the possibility of excludeJobIds being absent or not a list.
val excludeJobIds: Seq[String] = scalaSource.get("excludeJobIds") match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,4 +116,41 @@ class FlintInstanceTest extends SparkFunSuite with Matchers {
}
}

test("deserializeFromMap should handle missing jobStartTime") {
val sourceMap = new JavaHashMap[String, AnyRef]()
sourceMap.put("applicationId", "app1")
sourceMap.put("jobId", "job1")
sourceMap.put("sessionId", "session1")
sourceMap.put("state", "running")
sourceMap.put("lastUpdateTime", java.lang.Long.valueOf(1234567890L))
// jobStartTime is not added, simulating its absence
sourceMap.put("excludeJobIds", java.util.Arrays.asList("job2", "job3"))
sourceMap.put("error", "An error occurred")

val result = FlintInstance.deserializeFromMap(sourceMap)

assert(result.applicationId == "app1")
assert(result.jobId == "job1")
assert(result.sessionId == "session1")
assert(result.state == "running")
assert(result.lastUpdateTime == 1234567890L)
assert(result.jobStartTime == 0L) // Default value for missing jobStartTime
assert(result.excludedJobIds == Seq("job2", "job3"))
assert(result.error.contains("An error occurred"))
}

test("deserialize should correctly parse a FlintInstance without jobStartTime from JSON") {
val json =
"""{"applicationId":"app-123","jobId":"job-456","sessionId":"session-789","state":"RUNNING","lastUpdateTime":1620000000000,"excludeJobIds":["job-101","job-202"]}"""
val instance = FlintInstance.deserialize(json)

instance.applicationId shouldBe "app-123"
instance.jobId shouldBe "job-456"
instance.sessionId shouldBe "session-789"
instance.state shouldBe "RUNNING"
instance.lastUpdateTime shouldBe 1620000000000L
instance.jobStartTime shouldBe 0L // Default or expected value for missing jobStartTime
instance.excludedJobIds should contain allOf ("job-101", "job-202")
instance.error shouldBe None
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ object FlintREPL extends Logging with FlintJobExecutor {
val excludeJobIds = confExcludeJobs.split(",").toList // Convert Array to Lis

if (excludeJobIds.contains(jobId)) {
// Edge case, current job is excluded, exit the application
logInfo(s"current job is excluded, exit the application.")
return true
}

Expand All @@ -234,7 +234,7 @@ object FlintREPL extends Logging with FlintJobExecutor {
if (source != null) {
val existingExcludedJobIds = parseExcludedJobIds(source)
if (excludeJobIds.sorted == existingExcludedJobIds.sorted) {
// Edge case, duplicate job running, exit the application
logInfo("duplicate job running, exit the application.")
return true
}
}
Expand Down

0 comments on commit 2e45e27

Please sign in to comment.