diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index bfec39f43..fed711368 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -437,9 +437,13 @@ class FlintSpark(val spark: SparkSession) extends Logging { private def updateIndexAutoToManual(index: FlintSparkIndex): Option[String] = { val indexName = index.name + // TODO: what if index from api user is not constructed with Builder.copyWithUpdate? + val indexLogEntry = index.latestLogEntry.get flintClient .startTransaction(indexName, dataSourceName) - .initialLog(latest => latest.state == REFRESHING) + .initialLog(latest => + // TODO: in the future should use another lock to protect entire process and not rely on this + latest.state == REFRESHING && latest.seqNo == indexLogEntry.seqNo && latest.primaryTerm == indexLogEntry.primaryTerm) .transientLog(latest => latest.copy(state = UPDATING)) .finalLog(latest => latest.copy(state = ACTIVE)) .commit(_ => { @@ -453,10 +457,14 @@ class FlintSpark(val spark: SparkSession) extends Logging { private def updateIndexManualToAuto(index: FlintSparkIndex): Option[String] = { val indexName = index.name + // TODO: what if index from api user is not constructed with Builder.copyWithUpdate? + val indexLogEntry = index.latestLogEntry.get val indexRefresh = FlintSparkIndexRefresh.create(indexName, index) flintClient .startTransaction(indexName, dataSourceName) - .initialLog(latest => latest.state == ACTIVE) + .initialLog(latest => + // TODO: in the future should use another lock to protect entire process and not rely on this + latest.state == ACTIVE && latest.seqNo == indexLogEntry.seqNo && latest.primaryTerm == indexLogEntry.primaryTerm) .transientLog(latest => latest.copy(state = UPDATING, createTime = System.currentTimeMillis())) .finalLog(latest => { diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala index 8de8a69be..b475cf95f 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala @@ -499,28 +499,29 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { .create() // This update will be delayed - val index = flint.describeIndex(testIndex).get + val indexInitial = flint.describeIndex(testIndex).get val updatedIndexObsolete = flint .skippingIndex() - .copyWithUpdate(index, FlintSparkIndexOptions(Map("auto_refresh" -> "true", "checkpoint_location" -> checkpointDir.getAbsolutePath))) + .copyWithUpdate(indexInitial, FlintSparkIndexOptions(Map("auto_refresh" -> "true", "checkpoint_location" -> checkpointDir.getAbsolutePath))) // This other update finishes first, converting to auto refresh flint.updateIndex(flint .skippingIndex() - .copyWithUpdate(index, FlintSparkIndexOptions(Map("auto_refresh" -> "true")))) + .copyWithUpdate(indexInitial, FlintSparkIndexOptions(Map("auto_refresh" -> "true")))) // Adding another update to convert to full refresh, so the obsolete update doesn't fail for option validation or state validation + val indexUpdated = flint.describeIndex(testIndex).get flint.updateIndex(flint .skippingIndex() - .copyWithUpdate(index, FlintSparkIndexOptions(Map("auto_refresh" -> "false")))) + .copyWithUpdate(indexUpdated, FlintSparkIndexOptions(Map("auto_refresh" -> "false")))) // This update trying to update an obsolete index should fail the[IllegalStateException] thrownBy flint.updateIndex(updatedIndexObsolete) // Verify index after update - val readNewIndex = flint.describeIndex(testIndex).get - readNewIndex.options.autoRefresh() shouldBe true - readNewIndex.options.checkpointLocation() shouldBe empty + val indexFinal = flint.describeIndex(testIndex).get + indexFinal.options.autoRefresh() shouldBe false + indexFinal.options.checkpointLocation() shouldBe empty } }