Skip to content

Commit

Permalink
use metadata log service in transaction IT suites
Browse files Browse the repository at this point in the history
Signed-off-by: Sean Kao <[email protected]>
  • Loading branch information
seankao-az committed Jun 12, 2024
1 parent ddb1ac2 commit d1de5e6
Showing 1 changed file with 34 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ import org.opensearch.flint.OpenSearchTransactionSuite
import org.opensearch.flint.core.metadata.FlintMetadata
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState._
import org.opensearch.flint.core.metadata.log.FlintMetadataLogService
import org.opensearch.flint.core.storage.FlintOpenSearchClient
import org.opensearch.flint.core.storage.FlintOpenSearchMetadataLogService
import org.opensearch.index.seqno.SequenceNumbers.{UNASSIGNED_PRIMARY_TERM, UNASSIGNED_SEQ_NO}
import org.scalatest.matchers.should.Matchers
import org.scalatestplus.mockito.MockitoSugar.mock
Expand All @@ -27,16 +29,18 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers {

val testFlintIndex = "flint_test_index"
val testLatestId: String = Base64.getEncoder.encodeToString(testFlintIndex.getBytes)
var flintMetadataLogService: FlintMetadataLogService = _
var flintClient: FlintClient = _

override def beforeAll(): Unit = {
super.beforeAll()
val options = openSearchOptions + (DATA_SOURCE_NAME.key -> testDataSourceName)
flintClient = new FlintOpenSearchClient(new FlintOptions(options.asJava))
flintMetadataLogService = new FlintOpenSearchMetadataLogService(new FlintOptions(options.asJava))
flintClient = new FlintOpenSearchClient(new FlintOptions(options.asJava), flintMetadataLogService)
}

test("empty metadata log entry content") {
flintClient
flintMetadataLogService
.startTransaction(testFlintIndex)
.initialLog(latest => {
latest.id shouldBe testLatestId
Expand All @@ -51,9 +55,11 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers {
}

test("get index metadata with latest log entry") {
val indexName = "test_latest_log_entry"
val latestId: String = Base64.getEncoder.encodeToString(indexName.getBytes)
val testCreateTime = 1234567890123L
val flintMetadataLogEntry = FlintMetadataLogEntry(
id = testLatestId,
id = latestId,
seqNo = UNASSIGNED_SEQ_NO,
primaryTerm = UNASSIGNED_PRIMARY_TERM,
createTime = testCreateTime,
Expand All @@ -65,28 +71,31 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers {
when(metadata.indexSettings).thenReturn(None)
when(metadata.latestLogEntry).thenReturn(Some(flintMetadataLogEntry))

flintClient.createIndex(testFlintIndex, metadata)
flintClient.createIndex(indexName, metadata)
createLatestLogEntry(flintMetadataLogEntry)

val latest = flintClient.getIndexMetadata(testFlintIndex).latestLogEntry
val latest = flintClient.getIndexMetadata(indexName).latestLogEntry
latest.isDefined shouldBe true
latest.get.id shouldBe testLatestId
latest.get.id shouldBe latestId
latest.get.createTime shouldBe testCreateTime
latest.get.dataSource shouldBe testDataSourceName
latest.get.error shouldBe ""

deleteTestIndex(testFlintIndex)
flintClient.deleteIndex(indexName)
flintClient.exists(indexName) shouldBe false
}

test("should get empty metadata log entry") {
test("get index metadata without log entry") {
val indexName = "empty_log_entry"
val metadata = mock[FlintMetadata]
when(metadata.getContent).thenReturn("{}")
when(metadata.indexSettings).thenReturn(None)
flintClient.createIndex(testFlintIndex, metadata)
flintClient.createIndex(indexName, metadata)

flintClient.getIndexMetadata(testFlintIndex).latestLogEntry shouldBe empty
flintClient.getIndexMetadata(indexName).latestLogEntry shouldBe empty

deleteTestIndex(testFlintIndex)
flintClient.deleteIndex(indexName)
flintClient.exists(indexName) shouldBe false
}

test("should preserve original values when transition") {
Expand All @@ -101,7 +110,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers {
dataSource = testDataSourceName,
error = ""))

flintClient
flintMetadataLogService
.startTransaction(testFlintIndex)
.initialLog(latest => {
latest.id shouldBe testLatestId
Expand All @@ -125,7 +134,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers {
}

test("should transit from initial to final log if initial log is empty") {
flintClient
flintMetadataLogService
.startTransaction(testFlintIndex)
.initialLog(latest => {
latest.state shouldBe EMPTY
Expand All @@ -139,7 +148,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers {
}

test("should transit from initial to final log directly if no transient log") {
flintClient
flintMetadataLogService
.startTransaction(testFlintIndex)
.initialLog(_ => true)
.finalLog(latest => latest.copy(state = ACTIVE))
Expand All @@ -161,7 +170,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers {
dataSource = testDataSourceName,
error = ""))

flintClient
flintMetadataLogService
.startTransaction(testFlintIndex)
.initialLog(latest => {
latest.state shouldBe ACTIVE
Expand All @@ -176,7 +185,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers {

test("should exit if initial log entry doesn't meet precondition") {
the[IllegalStateException] thrownBy {
flintClient
flintMetadataLogService
.startTransaction(testFlintIndex)
.initialLog(_ => false)
.transientLog(latest => latest.copy(state = ACTIVE))
Expand All @@ -190,7 +199,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers {

test("should fail if initial log entry updated by others when updating transient log entry") {
the[IllegalStateException] thrownBy {
flintClient
flintMetadataLogService
.startTransaction(testFlintIndex)
.initialLog(_ => true)
.transientLog(latest => {
Expand All @@ -206,7 +215,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers {

test("should fail if transient log entry updated by others when updating final log entry") {
the[IllegalStateException] thrownBy {
flintClient
flintMetadataLogService
.startTransaction(testFlintIndex)
.initialLog(_ => true)
.transientLog(latest => {
Expand All @@ -224,7 +233,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers {
test("should rollback to initial log if transaction operation failed") {
// Use create index scenario in this test case
the[IllegalStateException] thrownBy {
flintClient
flintMetadataLogService
.startTransaction(testFlintIndex)
.initialLog(_ => true)
.transientLog(latest => latest.copy(state = CREATING))
Expand All @@ -249,7 +258,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers {
error = ""))

the[IllegalStateException] thrownBy {
flintClient
flintMetadataLogService
.startTransaction(testFlintIndex)
.initialLog(_ => true)
.transientLog(latest => latest.copy(state = REFRESHING))
Expand All @@ -265,7 +274,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers {
"should not necessarily rollback if transaction operation failed but no transient action") {
// Use create index scenario in this test case
the[IllegalStateException] thrownBy {
flintClient
flintMetadataLogService
.startTransaction(testFlintIndex)
.initialLog(_ => true)
.finalLog(latest => latest.copy(state = ACTIVE))
Expand All @@ -278,7 +287,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers {

test("forceInit translog, even index is deleted before startTransaction") {
deleteIndex(testMetaLogIndex)
flintClient
flintMetadataLogService
.startTransaction(testFlintIndex, true)
.initialLog(latest => {
latest.id shouldBe testLatestId
Expand All @@ -298,7 +307,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers {

test("should fail if index is deleted before initial operation") {
the[IllegalStateException] thrownBy {
flintClient
flintMetadataLogService
.startTransaction(testFlintIndex)
.initialLog(latest => {
deleteIndex(testMetaLogIndex)
Expand All @@ -312,7 +321,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers {

test("should fail if index is deleted before transient operation") {
the[IllegalStateException] thrownBy {
flintClient
flintMetadataLogService
.startTransaction(testFlintIndex)
.initialLog(latest => true)
.transientLog(latest => {
Expand All @@ -326,7 +335,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers {

test("should fail if index is deleted before final operation") {
the[IllegalStateException] thrownBy {
flintClient
flintMetadataLogService
.startTransaction(testFlintIndex)
.initialLog(latest => true)
.transientLog(latest => { latest.copy(state = CREATING) })
Expand Down

0 comments on commit d1de5e6

Please sign in to comment.