Skip to content

Commit

Permalink
Change FlintSpark API to update job start time
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Nov 3, 2023
1 parent 528c8ff commit 53b2f72
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,22 @@ case class FlintMetadataLogEntry(
id: String,
seqNo: Long,
primaryTerm: Long,
/**
* This is currently used as streaming job start time. In future, this should represent the
* create timestamp of the log entry
*/
createTime: Long,
state: IndexState,
dataSource: String, // TODO: get from Spark conf
dataSource: String,
error: String) {

def this(id: String, seqNo: Long, primaryTerm: Long, map: java.util.Map[String, AnyRef]) {
this(
id,
seqNo,
primaryTerm,
map.get("jobStartTime").asInstanceOf[Long],
/* getSourceAsMap() may use Integer or Long even though it's always long in index mapping */
map.get("jobStartTime").asInstanceOf[Number].longValue(),
IndexState.from(map.get("state").asInstanceOf[String]),
map.get("dataSourceName").asInstanceOf[String],
map.get("error").asInstanceOf[String])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ class FlintSpark(val spark: SparkSession) extends Logging {
} else {
// Schedule regular update and return log entry as refreshing state
scheduleIndexStateUpdate(indexName)
latest
// Update job start time. TODO: make same changes for recoverIndex
latest.copy(createTime = System.currentTimeMillis())
}
})
.commit(_ => doRefreshIndex(index, indexName, mode))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match
.addPartitions("year", "month")
.create()

latestLogEntry(testLatestId) should contain("state" -> "active")
latestLogEntry(testLatestId) should (contain("latestId" -> testLatestId)
and contain("state" -> "active")
and contain("jobStartTime" -> 0)
and contain("dataSourceName" -> testDataSourceName))

implicit val formats: Formats = Serialization.formats(NoTypeHints)
val mapping =
Expand All @@ -63,17 +66,32 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match
.create()
flint.refreshIndex(testFlintIndex, FULL)

latestLogEntry(testLatestId) should contain("state" -> "active")
latestLogEntry(testLatestId) should (contain("state" -> "active")
and contain("jobStartTime" -> 0))
}

test("incremental refresh index") {
// TODO: this test needs recover command
ignore("incremental refresh index") {
flint
.skippingIndex()
.onTable(testTable)
.addPartitions("year", "month")
.create()
flint.refreshIndex(testFlintIndex, INCREMENTAL)
latestLogEntry(testLatestId) should contain("state" -> "refreshing")

// Job start time should be assigned
var latest = latestLogEntry(testLatestId)
latest should contain("state" -> "refreshing")
val prevStartTime = latest("jobStartTime").asInstanceOf[Number].longValue()
prevStartTime should be > 0L

// Restart streaming job
spark.streams.active.head.stop()
// flint.refreshIndex(testFlintIndex, INCREMENTAL)

// Make sure job start time is updated
latest = latestLogEntry(testLatestId)
latest("jobStartTime").asInstanceOf[Number].longValue() should be > prevStartTime
}

test("delete index") {
Expand Down

0 comments on commit 53b2f72

Please sign in to comment.