Skip to content

Commit

Permalink
Cherry pick and fix conflicts
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Jun 27, 2024
1 parent a890dbb commit 9f9a5b1
Show file tree
Hide file tree
Showing 5 changed files with 207 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public T commit(Function<FlintMetadataLogEntry, T> operation) {
if (!initialCondition.test(latest)) {
LOG.warning("Initial log entry doesn't satisfy precondition " + latest);
throw new IllegalStateException(
"Transaction failed due to initial log precondition not satisfied");
String.format("Index state [%s] doesn't satisfy precondition", latest.state()));
}

// Append optional transient log
Expand Down Expand Up @@ -122,7 +122,7 @@ public T commit(Function<FlintMetadataLogEntry, T> operation) {
} catch (Exception ex) {
LOG.log(WARNING, "Failed to rollback transient log", ex);
}
throw new IllegalStateException("Failed to commit transaction operation");
throw new IllegalStateException("Failed to commit transaction operation", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import org.json4s.{Formats, NoTypeHints}
import org.json4s.native.Serialization
import org.opensearch.flint.core.{FlintClient, FlintClientBuilder}
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState._
import org.opensearch.flint.core.metadata.log.OptimisticTransaction
import org.opensearch.flint.core.metadata.log.OptimisticTransaction.NO_LOG_ENTRY
import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN
import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName._
Expand All @@ -31,7 +32,7 @@ import org.apache.spark.sql.flint.config.FlintSparkConf.{DOC_ID_COLUMN_NAME, IGN
/**
* Flint Spark integration API entrypoint.
*/
class FlintSpark(val spark: SparkSession) extends Logging {
class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport with Logging {

/** Flint spark configuration */
private val flintSparkConf: FlintSparkConf =
Expand All @@ -41,7 +42,8 @@ class FlintSpark(val spark: SparkSession) extends Logging {
IGNORE_DOC_ID_COLUMN.optionKey -> "true").asJava)

/** Flint client for low-level index operation */
private val flintClient: FlintClient = FlintClientBuilder.build(flintSparkConf.flintOptions())
override protected val flintClient: FlintClient =
FlintClientBuilder.build(flintSparkConf.flintOptions())

/** Required by json4s parse function */
implicit val formats: Formats = Serialization.formats(NoTypeHints) + SkippingKindSerializer
Expand All @@ -50,7 +52,7 @@ class FlintSpark(val spark: SparkSession) extends Logging {
* Data source name. Assign empty string in case of backward compatibility. TODO: remove this in
* future
*/
private val dataSourceName: String =
override protected val dataSourceName: String =
spark.conf.getOption("spark.flint.datasource.name").getOrElse("")

/** Flint Spark index monitor */
Expand Down Expand Up @@ -95,18 +97,16 @@ class FlintSpark(val spark: SparkSession) extends Logging {
* @param ignoreIfExists
* Ignore existing index
*/
def createIndex(index: FlintSparkIndex, ignoreIfExists: Boolean = false): Unit = {
logInfo(s"Creating Flint index $index with ignoreIfExists $ignoreIfExists")
val indexName = index.name()
if (flintClient.exists(indexName)) {
if (!ignoreIfExists) {
throw new IllegalStateException(s"Flint index $indexName already exists")
}
} else {
val metadata = index.metadata()
try {
flintClient
.startTransaction(indexName, dataSourceName, true)
def createIndex(index: FlintSparkIndex, ignoreIfExists: Boolean = false): Unit =
withTransaction[Unit](index.name(), "Create Flint index", forceInit = true) { tx =>
val indexName = index.name()
if (flintClient.exists(indexName)) {
if (!ignoreIfExists) {
throw new IllegalStateException(s"Flint index $indexName already exists")
}
} else {
val metadata = index.metadata()
tx
.initialLog(latest => latest.state == EMPTY || latest.state == DELETED)
.transientLog(latest => latest.copy(state = CREATING))
.finalLog(latest => latest.copy(state = ACTIVE))
Expand All @@ -117,14 +117,8 @@ class FlintSpark(val spark: SparkSession) extends Logging {
logInfo(s"Creating index with metadata log entry ID ${latest.id}")
flintClient.createIndex(indexName, metadata.copy(latestId = Some(latest.id)))
})
logInfo("Create index complete")
} catch {
case e: Exception =>
logError("Failed to create Flint index", e)
throw new IllegalStateException("Failed to create Flint index")
}
}
}

/**
* Start refreshing index data according to the given mode.
Expand All @@ -134,15 +128,12 @@ class FlintSpark(val spark: SparkSession) extends Logging {
* @return
* refreshing job ID (empty if batch job for now)
*/
def refreshIndex(indexName: String): Option[String] = {
logInfo(s"Refreshing Flint index $indexName")
val index = describeIndex(indexName)
.getOrElse(throw new IllegalStateException(s"Index $indexName doesn't exist"))
val indexRefresh = FlintSparkIndexRefresh.create(indexName, index)

try {
flintClient
.startTransaction(indexName, dataSourceName)
def refreshIndex(indexName: String): Option[String] =
withTransaction[Option[String]](indexName, "Refresh Flint index") { tx =>
val index = describeIndex(indexName)
.getOrElse(throw new IllegalStateException(s"Index $indexName doesn't exist"))
val indexRefresh = FlintSparkIndexRefresh.create(indexName, index)
tx
.initialLog(latest => latest.state == ACTIVE)
.transientLog(latest =>
latest.copy(state = REFRESHING, createTime = System.currentTimeMillis()))
Expand All @@ -158,12 +149,7 @@ class FlintSpark(val spark: SparkSession) extends Logging {
}
})
.commit(_ => indexRefresh.start(spark, flintSparkConf))
} catch {
case e: Exception =>
logError("Failed to refresh Flint index", e)
throw new IllegalStateException("Failed to refresh Flint index")
}
}

/**
* Describe all Flint indexes whose name matches the given pattern.
Expand Down Expand Up @@ -214,25 +200,19 @@ class FlintSpark(val spark: SparkSession) extends Logging {
* refreshing job ID (empty if no job)
*/
def updateIndex(index: FlintSparkIndex): Option[String] = {
logInfo(s"Updating Flint index $index")
val indexName = index.name

val indexName = index.name()
validateUpdateAllowed(
describeIndex(indexName)
.getOrElse(throw new IllegalStateException(s"Index $indexName doesn't exist"))
.options,
index.options)

try {
withTransaction[Option[String]](indexName, "Update Flint index") { tx =>
// Relies on validation to forbid auto-to-auto and manual-to-manual updates
index.options.autoRefresh() match {
case true => updateIndexManualToAuto(index)
case false => updateIndexAutoToManual(index)
case true => updateIndexManualToAuto(index, tx)
case false => updateIndexAutoToManual(index, tx)
}
} catch {
case e: Exception =>
logError("Failed to update Flint index", e)
throw new IllegalStateException("Failed to update Flint index")
}
}

Expand All @@ -244,12 +224,10 @@ class FlintSpark(val spark: SparkSession) extends Logging {
* @return
* true if exist and deleted, otherwise false
*/
def deleteIndex(indexName: String): Boolean = {
logInfo(s"Deleting Flint index $indexName")
if (flintClient.exists(indexName)) {
try {
flintClient
.startTransaction(indexName, dataSourceName)
def deleteIndex(indexName: String): Boolean =
withTransaction[Boolean](indexName, "Delete Flint index") { tx =>
if (flintClient.exists(indexName)) {
tx
.initialLog(latest =>
latest.state == ACTIVE || latest.state == REFRESHING || latest.state == FAILED)
.transientLog(latest => latest.copy(state = DELETING))
Expand All @@ -260,16 +238,11 @@ class FlintSpark(val spark: SparkSession) extends Logging {
stopRefreshingJob(indexName)
true
})
} catch {
case e: Exception =>
logError("Failed to delete Flint index", e)
throw new IllegalStateException("Failed to delete Flint index")
} else {
logInfo("Flint index to be deleted doesn't exist")
false
}
} else {
logInfo("Flint index to be deleted doesn't exist")
false
}
}

/**
* Delete a Flint index physically.
Expand All @@ -279,81 +252,66 @@ class FlintSpark(val spark: SparkSession) extends Logging {
* @return
* true if exist and deleted, otherwise false
*/
def vacuumIndex(indexName: String): Boolean = {
logInfo(s"Vacuuming Flint index $indexName")
if (flintClient.exists(indexName)) {
try {
flintClient
.startTransaction(indexName, dataSourceName)
def vacuumIndex(indexName: String): Boolean =
withTransaction[Boolean](indexName, "Vacuum Flint index") { tx =>
if (flintClient.exists(indexName)) {
tx
.initialLog(latest => latest.state == DELETED)
.transientLog(latest => latest.copy(state = VACUUMING))
.finalLog(_ => NO_LOG_ENTRY)
.commit(_ => {
flintClient.deleteIndex(indexName)
true
})
} catch {
case e: Exception =>
logError("Failed to vacuum Flint index", e)
throw new IllegalStateException("Failed to vacuum Flint index")
} else {
logInfo("Flint index to vacuum doesn't exist")
false
}
} else {
logInfo("Flint index to vacuum doesn't exist")
false
}
}

/**
* Recover index job.
*
* @param indexName
* index name
*/
def recoverIndex(indexName: String): Boolean = {
logInfo(s"Recovering Flint index $indexName")
val index = describeIndex(indexName)
if (index.exists(_.options.autoRefresh())) {
try {
flintClient
.startTransaction(indexName, dataSourceName)
def recoverIndex(indexName: String): Boolean =
withTransaction[Boolean](indexName, "Recover Flint index") { tx =>
val index = describeIndex(indexName)
if (index.exists(_.options.autoRefresh())) {
tx
.initialLog(latest => Set(ACTIVE, REFRESHING, FAILED).contains(latest.state))
.transientLog(latest =>
latest.copy(state = RECOVERING, createTime = System.currentTimeMillis()))
.finalLog(latest => {
flintIndexMonitor.startMonitor(indexName)
latest.copy(state = REFRESHING)
})
.commit(_ =>
.commit(_ => {
FlintSparkIndexRefresh
.create(indexName, index.get)
.start(spark, flintSparkConf))

logInfo("Recovery complete")
true
} catch {
case e: Exception =>
logError("Failed to recover Flint index", e)
throw new IllegalStateException("Failed to recover Flint index")
}
} else {
logInfo("Index to be recovered either doesn't exist or not auto refreshed")
if (index.isEmpty) {
/*
* If execution reaches this point, it indicates that the Flint index is corrupted.
* In such cases, clean up the metadata log, as the index data no longer exists.
* There is a very small possibility that users may recreate the index in the
* interim, but metadata log get deleted by this cleanup process.
*/
logWarning("Cleaning up metadata log as index data has been deleted")
flintClient
.startTransaction(indexName, dataSourceName)
.initialLog(_ => true)
.finalLog(_ => NO_LOG_ENTRY)
.commit(_ => {})
.start(spark, flintSparkConf)
true
})
} else {
logInfo("Index to be recovered either doesn't exist or not auto refreshed")
if (index.isEmpty) {
/*
* If execution reaches this point, it indicates that the Flint index is corrupted.
* In such cases, clean up the metadata log, as the index data no longer exists.
* There is a very small possibility that users may recreate the index in the
* interim, but metadata log get deleted by this cleanup process.
*/
logWarning("Cleaning up metadata log as index data has been deleted")
tx
.initialLog(_ => true)
.finalLog(_ => NO_LOG_ENTRY)
.commit(_ => { false })
} else {
false
}
}
false
}
}

/**
* Build data frame for querying the given index. This is mostly for unit test convenience.
Expand Down Expand Up @@ -432,11 +390,12 @@ class FlintSpark(val spark: SparkSession) extends Logging {
}
}

private def updateIndexAutoToManual(index: FlintSparkIndex): Option[String] = {
private def updateIndexAutoToManual(
index: FlintSparkIndex,
tx: OptimisticTransaction[Option[String]]): Option[String] = {
val indexName = index.name
val indexLogEntry = index.latestLogEntry.get
flintClient
.startTransaction(indexName, dataSourceName)
tx
.initialLog(latest =>
latest.state == REFRESHING && latest.seqNo == indexLogEntry.seqNo && latest.primaryTerm == indexLogEntry.primaryTerm)
.transientLog(latest => latest.copy(state = UPDATING))
Expand All @@ -450,12 +409,13 @@ class FlintSpark(val spark: SparkSession) extends Logging {
})
}

private def updateIndexManualToAuto(index: FlintSparkIndex): Option[String] = {
private def updateIndexManualToAuto(
index: FlintSparkIndex,
tx: OptimisticTransaction[Option[String]]): Option[String] = {
val indexName = index.name
val indexLogEntry = index.latestLogEntry.get
val indexRefresh = FlintSparkIndexRefresh.create(indexName, index)
flintClient
.startTransaction(indexName, dataSourceName)
tx
.initialLog(latest =>
latest.state == ACTIVE && latest.seqNo == indexLogEntry.seqNo && latest.primaryTerm == indexLogEntry.primaryTerm)
.transientLog(latest =>
Expand Down
Loading

0 comments on commit 9f9a5b1

Please sign in to comment.