Skip to content

Commit

Permalink
Enhance withTx API with UT
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Sep 3, 2024
1 parent cc52b1c commit c54546a
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@ import scala.collection.JavaConverters._
import org.json4s.{Formats, NoTypeHints}
import org.json4s.native.Serialization
import org.opensearch.flint.common.metadata.{FlintIndexMetadataService, FlintMetadata}
import org.opensearch.flint.common.metadata.log.{FlintMetadataLogService, OptimisticTransaction}
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState._
import org.opensearch.flint.common.metadata.log.FlintMetadataLogService
import org.opensearch.flint.common.metadata.log.OptimisticTransaction
import org.opensearch.flint.common.metadata.log.OptimisticTransaction.NO_LOG_ENTRY
import org.opensearch.flint.core.{FlintClient, FlintClientBuilder}
import org.opensearch.flint.core.metadata.FlintIndexMetadataServiceBuilder
Expand Down Expand Up @@ -46,7 +45,8 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
IGNORE_DOC_ID_COLUMN.optionKey -> "true").asJava)

/** Flint client for low-level index operation */
private val flintClient: FlintClient = FlintClientBuilder.build(flintSparkConf.flintOptions())
override protected val flintClient: FlintClient =
FlintClientBuilder.build(flintSparkConf.flintOptions())

private val flintIndexMetadataService: FlintIndexMetadataService = {
FlintIndexMetadataServiceBuilder.build(flintSparkConf.flintOptions())
Expand Down Expand Up @@ -156,7 +156,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
}
})
.commit(_ => indexRefresh.start(spark, flintSparkConf))
}
}.flatten

/**
* Describe all Flint indexes whose name matches the given pattern.
Expand Down Expand Up @@ -225,7 +225,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
case true => updateIndexManualToAuto(index, tx)
case false => updateIndexAutoToManual(index, tx)
}
}
}.flatten
}

/**
Expand Down Expand Up @@ -254,7 +254,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
logInfo("Flint index to be deleted doesn't exist")
false
}
}
}.getOrElse(false)

/**
* Delete a Flint index physically.
Expand All @@ -280,7 +280,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
logInfo("Flint index to vacuum doesn't exist")
false
}
}
}.getOrElse(false)

/**
* Recover index job.
Expand All @@ -307,24 +307,10 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
true
})
} else {
logInfo("Index to be recovered either doesn't exist or not auto refreshed")
if (index.isEmpty) {
/*
* If execution reaches this point, it indicates that the Flint index is corrupted.
* In such cases, clean up the metadata log, as the index data no longer exists.
* There is a very small possibility that users may recreate the index in the
* interim, but metadata log get deleted by this cleanup process.
*/
logWarning("Cleaning up metadata log as index data has been deleted")
tx
.initialLog(_ => true)
.finalLog(_ => NO_LOG_ENTRY)
.commit(_ => { false })
} else {
false
}
logInfo("Index to be recovered is not auto refreshed")
false
}
}
}.getOrElse(false)

/**
* Build data frame for querying the given index. This is mostly for unit test convenience.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,32 @@
package org.opensearch.flint.spark

import org.opensearch.flint.common.metadata.log.{FlintMetadataLogService, OptimisticTransaction}
import org.opensearch.flint.common.metadata.log.OptimisticTransaction.NO_LOG_ENTRY
import org.opensearch.flint.core.FlintClient

import org.apache.spark.internal.Logging

/**
* Provides transaction support with proper error handling and logging capabilities.
*
* @note
* This trait requires the mixing class to extend Spark's `Logging` to utilize its logging
* functionalities. Meanwhile it needs to provide `FlintClient` and data source name so this
* trait can help create transaction context.
* This trait requires the mixing class to provide both `FlintClient` and
* `FlintMetadataLogService` so this trait can help create transaction context.
*/
trait FlintSparkTransactionSupport { self: Logging =>
trait FlintSparkTransactionSupport extends Logging {

/** Flint client defined in the mixing class */
protected def flintClient: FlintClient

/** Flint metadata log service defined in the mixing class */
protected def flintMetadataLogService: FlintMetadataLogService

/**
* Executes a block of code within a transaction context, handling and logging errors
* appropriately. This method logs the start and completion of the transaction and captures any
* exceptions that occur, enriching them with detailed error messages before re-throwing.
* exceptions that occur, enriching them with detailed error messages before re-throwing. If the
* index data is missing (excluding index creation actions), the operation is bypassed, and any
* dangling metadata log entries are cleaned up.
*
* @param indexName
* the name of the index on which the operation is performed
Expand All @@ -39,19 +45,38 @@ trait FlintSparkTransactionSupport { self: Logging =>
* @tparam T
* the type of the result produced by the operation block
* @return
* the result of the operation block
* Some(result) of the operation block if the operation is executed, or None if the operation
* execution is bypassed due to missing index data
*/
def withTransaction[T](indexName: String, opName: String, forceInit: Boolean = false)(
opBlock: OptimisticTransaction[T] => T): T = {
opBlock: OptimisticTransaction[T] => T): Option[T] = {
logInfo(s"Starting index operation [$opName $indexName] with forceInit=$forceInit")
try {
// Create transaction (only have side effect if forceInit is true)
val tx: OptimisticTransaction[T] =
flintMetadataLogService.startTransaction(indexName, forceInit)
// Execute the action if data index exists or create index action (indicated by forceInit)
if (forceInit || flintClient.exists(indexName)) {

val result = opBlock(tx)
logInfo(s"Index operation [$opName $indexName] complete")
result
// Create transaction (only have side effect if forceInit is true)
val tx: OptimisticTransaction[T] =
flintMetadataLogService.startTransaction(indexName, forceInit)
val result = opBlock(tx)
logInfo(s"Index operation [$opName $indexName] complete")
Some(result)
} else {
/*
* If execution reaches this point, it indicates that the Flint index is corrupted.
* In such cases, clean up the metadata log, as the index data no longer exists.
* There is a very small possibility that users may recreate the index in the
* interim, but metadata log get deleted by this cleanup process.
*/
logWarning(
s"Bypassing index operation [$opName $indexName] as index data has been deleted")
flintMetadataLogService
.startTransaction(indexName)
.initialLog(_ => true)
.finalLog(_ => NO_LOG_ENTRY)
.commit(_ => {})
None
}
} catch {
case e: Exception =>
logError(s"Failed to execute index operation [$opName $indexName]", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,40 +5,76 @@

package org.opensearch.flint.spark

import org.mockito.Mockito.{times, verify}
import org.mockito.Mockito.{never, reset, times, verify, when, RETURNS_DEEP_STUBS}
import org.opensearch.flint.common.metadata.log.FlintMetadataLogService
import org.opensearch.flint.core.FlintClient
import org.scalatest.matchers.should.Matchers
import org.scalatestplus.mockito.MockitoSugar.mock

import org.apache.spark.FlintSuite
import org.apache.spark.internal.Logging

class FlintSparkTransactionSupportSuite extends FlintSuite with Matchers {

private val mockFlintMetadataLogService: FlintMetadataLogService = mock[FlintMetadataLogService]
private val mockFlintClient: FlintClient = mock[FlintClient]
private val mockFlintMetadataLogService: FlintMetadataLogService =
mock[FlintMetadataLogService](RETURNS_DEEP_STUBS)
private val testIndex = "test_index"
private val testOpName = "test operation"

/** Creating a fake FlintSparkTransactionSupport subclass for test */
private val transactionSupport = new FlintSparkTransactionSupport with Logging {
private val transactionSupport = new FlintSparkTransactionSupport {
override protected def flintClient: FlintClient = mockFlintClient
override protected def flintMetadataLogService: FlintMetadataLogService =
mockFlintMetadataLogService
}

test("with transaction without force initialization") {
transactionSupport.withTransaction[Unit](testIndex, testOpName) { _ => }
override protected def beforeEach(): Unit = {
reset(mockFlintClient, mockFlintMetadataLogService)
}

test("execute transaction without force initialization when index exists") {
when(mockFlintClient.exists(testIndex)).thenReturn(true)
val result =
transactionSupport
.withTransaction[Boolean](testIndex, testOpName) { _ => true }

result shouldBe Some(true)
verify(mockFlintMetadataLogService, times(1)).startTransaction(testIndex, false)
}

test("with transaction with force initialization") {
transactionSupport.withTransaction[Unit](testIndex, testOpName, forceInit = true) { _ => }
test("execute transaction with force initialization when index exists") {
when(mockFlintClient.exists(testIndex)).thenReturn(true)
val result =
transactionSupport
.withTransaction[Boolean](testIndex, testOpName, forceInit = true) { _ => true }

result shouldBe Some(true)
verify(mockFlintMetadataLogService, times(1)).startTransaction(testIndex, true)
}

test("bypass transaction without force initialization when index does not exist") {
when(mockFlintClient.exists(testIndex)).thenReturn(false)
val result =
transactionSupport
.withTransaction[Boolean](testIndex, testOpName) { _ => true }

result shouldBe None
}

test("execute transaction with force initialization even if index does not exist") {
when(mockFlintClient.exists(testIndex)).thenReturn(false)
val result =
transactionSupport
.withTransaction[Boolean](testIndex, testOpName, forceInit = true) { _ => true }

result shouldBe Some(true)
verify(mockFlintMetadataLogService, times(1)).startTransaction(testIndex, true)
}

test("should throw original exception") {
test("propagate original exception thrown within transaction") {
the[RuntimeException] thrownBy {
when(mockFlintClient.exists(testIndex)).thenReturn(true)

transactionSupport.withTransaction[Unit](testIndex, testOpName) { _ =>
val rootCause = new IllegalArgumentException("Fake root cause")
val cause = new RuntimeException("Fake cause", rootCause)
Expand Down

0 comments on commit c54546a

Please sign in to comment.