Skip to content

Commit

Permalink
solve update index race condition
Browse files Browse the repository at this point in the history
Signed-off-by: Sean Kao <[email protected]>
  • Loading branch information
seankao-az committed Mar 26, 2024
1 parent 495acaa commit 6495de7
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -437,9 +437,13 @@ class FlintSpark(val spark: SparkSession) extends Logging {

private def updateIndexAutoToManual(index: FlintSparkIndex): Option[String] = {
val indexName = index.name
// TODO: what if index from api user is not constructed with Builder.copyWithUpdate?
val indexLogEntry = index.latestLogEntry.get
flintClient
.startTransaction(indexName, dataSourceName)
.initialLog(latest => latest.state == REFRESHING)
.initialLog(latest =>
// TODO: in the future should use another lock to protect entire process and not rely on this
latest.state == REFRESHING && latest.seqNo == indexLogEntry.seqNo && latest.primaryTerm == indexLogEntry.primaryTerm)
.transientLog(latest => latest.copy(state = UPDATING))
.finalLog(latest => latest.copy(state = ACTIVE))
.commit(_ => {
Expand All @@ -453,10 +457,14 @@ class FlintSpark(val spark: SparkSession) extends Logging {

private def updateIndexManualToAuto(index: FlintSparkIndex): Option[String] = {
val indexName = index.name
// TODO: what if index from api user is not constructed with Builder.copyWithUpdate?
val indexLogEntry = index.latestLogEntry.get
val indexRefresh = FlintSparkIndexRefresh.create(indexName, index)
flintClient
.startTransaction(indexName, dataSourceName)
.initialLog(latest => latest.state == ACTIVE)
.initialLog(latest =>
// TODO: in the future should use another lock to protect entire process and not rely on this
latest.state == ACTIVE && latest.seqNo == indexLogEntry.seqNo && latest.primaryTerm == indexLogEntry.primaryTerm)
.transientLog(latest =>
latest.copy(state = UPDATING, createTime = System.currentTimeMillis()))
.finalLog(latest => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -499,28 +499,29 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite {
.create()

// This update will be delayed
val index = flint.describeIndex(testIndex).get
val indexInitial = flint.describeIndex(testIndex).get
val updatedIndexObsolete = flint
.skippingIndex()
.copyWithUpdate(index, FlintSparkIndexOptions(Map("auto_refresh" -> "true", "checkpoint_location" -> checkpointDir.getAbsolutePath)))
.copyWithUpdate(indexInitial, FlintSparkIndexOptions(Map("auto_refresh" -> "true", "checkpoint_location" -> checkpointDir.getAbsolutePath)))

// This other update finishes first, converting to auto refresh
flint.updateIndex(flint
.skippingIndex()
.copyWithUpdate(index, FlintSparkIndexOptions(Map("auto_refresh" -> "true"))))
.copyWithUpdate(indexInitial, FlintSparkIndexOptions(Map("auto_refresh" -> "true"))))
// Adding another update to convert to full refresh, so the obsolete update doesn't fail for option validation or state validation
val indexUpdated = flint.describeIndex(testIndex).get
flint.updateIndex(flint
.skippingIndex()
.copyWithUpdate(index, FlintSparkIndexOptions(Map("auto_refresh" -> "false"))))
.copyWithUpdate(indexUpdated, FlintSparkIndexOptions(Map("auto_refresh" -> "false"))))

// This update trying to update an obsolete index should fail
the[IllegalStateException] thrownBy
flint.updateIndex(updatedIndexObsolete)

// Verify index after update
val readNewIndex = flint.describeIndex(testIndex).get
readNewIndex.options.autoRefresh() shouldBe true
readNewIndex.options.checkpointLocation() shouldBe empty
val indexFinal = flint.describeIndex(testIndex).get
indexFinal.options.autoRefresh() shouldBe false
indexFinal.options.checkpointLocation() shouldBe empty
}
}

Expand Down

0 comments on commit 6495de7

Please sign in to comment.