From 48054101ae5c74c476d200f211a587fca26b6c8e Mon Sep 17 00:00:00 2001 From: Sean Kao Date: Wed, 12 Jun 2024 11:22:46 -0700 Subject: [PATCH] metadata log test suite Signed-off-by: Sean Kao --- .../flint/core/FlintMetadataLogITSuite.scala | 122 ++++++++++++++++++ .../flint/core/FlintTransactionITSuite.scala | 50 ------- 2 files changed, 122 insertions(+), 50 deletions(-) create mode 100644 integ-test/src/test/scala/org/opensearch/flint/core/FlintMetadataLogITSuite.scala diff --git a/integ-test/src/test/scala/org/opensearch/flint/core/FlintMetadataLogITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/core/FlintMetadataLogITSuite.scala new file mode 100644 index 000000000..bde344d45 --- /dev/null +++ b/integ-test/src/test/scala/org/opensearch/flint/core/FlintMetadataLogITSuite.scala @@ -0,0 +1,122 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core + +import java.util.Base64 + +import scala.collection.JavaConverters._ + +import org.mockito.Mockito.when +import org.opensearch.flint.OpenSearchTransactionSuite +import org.opensearch.flint.core.metadata.FlintMetadata +import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry +import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState._ +import org.opensearch.flint.core.metadata.log.FlintMetadataLogService +import org.opensearch.flint.core.storage.FlintOpenSearchClient +import org.opensearch.flint.core.storage.FlintOpenSearchMetadataLogService +import org.opensearch.index.seqno.SequenceNumbers.{UNASSIGNED_PRIMARY_TERM, UNASSIGNED_SEQ_NO} +import org.scalatest.matchers.should.Matchers +import org.scalatestplus.mockito.MockitoSugar.mock + +import org.apache.spark.sql.flint.config.FlintSparkConf.DATA_SOURCE_NAME + +class FlintMetadataLogITSuite extends OpenSearchTransactionSuite with Matchers { + + val testFlintIndex = "flint_test_index" + val testLatestId: String = Base64.getEncoder.encodeToString(testFlintIndex.getBytes) + val testCreateTime = 1234567890123L + val flintMetadataLogEntry = FlintMetadataLogEntry( + id = testLatestId, + seqNo = UNASSIGNED_SEQ_NO, + primaryTerm = UNASSIGNED_PRIMARY_TERM, + createTime = testCreateTime, + state = ACTIVE, + dataSource = testDataSourceName, + error = "") + + var flintMetadataLogService: FlintMetadataLogService = _ + var flintClient: FlintClient = _ + + override def beforeAll(): Unit = { + super.beforeAll() + val options = openSearchOptions + (DATA_SOURCE_NAME.key -> testDataSourceName) + val flintOptions = new FlintOptions(options.asJava) + flintMetadataLogService = new FlintOpenSearchMetadataLogService(flintOptions) + flintClient = new FlintOpenSearchClient(flintOptions, flintMetadataLogService) + } + + test("should fail if metadata log index doesn't exists") { + val options = openSearchOptions + (DATA_SOURCE_NAME.key -> "non-exist-datasource") + val flintMetadataLogService = new FlintOpenSearchMetadataLogService(new FlintOptions(options.asJava)) + + the[IllegalStateException] thrownBy { + flintMetadataLogService.startTransaction(testFlintIndex) + } + } + + test("should get index metadata log without log entry") { + val metadataLog = flintMetadataLogService.getIndexMetadataLog(testFlintIndex) + metadataLog.isPresent shouldBe true + metadataLog.get.getLatest shouldBe empty + } + + test("should get index metadata log with log entry") { + createLatestLogEntry(flintMetadataLogEntry) + val metadataLog = flintMetadataLogService.getIndexMetadataLog(testFlintIndex) + metadataLog.isPresent shouldBe true + + val latest = metadataLog.get.getLatest + latest.isPresent shouldBe true + latest.get.id shouldBe testLatestId + latest.get.createTime shouldBe testCreateTime + latest.get.dataSource shouldBe testDataSourceName + latest.get.error shouldBe "" + } + + test("should not get index metadata log if not exist") { + val options = openSearchOptions + (DATA_SOURCE_NAME.key -> "non-exist-datasource") + val flintMetadataLogService = new FlintOpenSearchMetadataLogService(new FlintOptions(options.asJava)) + val metadataLog = flintMetadataLogService.getIndexMetadataLog(testFlintIndex) + metadataLog.isPresent shouldBe false + } + + test("should initialize index metadata log if forceInit") { + val options = openSearchOptions + (DATA_SOURCE_NAME.key -> "non-exist-datasource") + val flintMetadataLogService = new FlintOpenSearchMetadataLogService(new FlintOptions(options.asJava)) + val metadataLog = flintMetadataLogService.getIndexMetadataLog(testFlintIndex, true) + metadataLog.isPresent shouldBe true + } + + test("should get index metadata with latest log entry") { + val metadata = mock[FlintMetadata] + when(metadata.getContent).thenReturn("{}") + when(metadata.indexSettings).thenReturn(None) + when(metadata.latestLogEntry).thenReturn(Some(flintMetadataLogEntry)) + + flintClient.createIndex(testFlintIndex, metadata) + createLatestLogEntry(flintMetadataLogEntry) + + val latest = flintClient.getIndexMetadata(testFlintIndex).latestLogEntry + latest.isDefined shouldBe true + latest.get.id shouldBe testLatestId + latest.get.createTime shouldBe testCreateTime + latest.get.dataSource shouldBe testDataSourceName + latest.get.error shouldBe "" + + deleteTestIndex(testFlintIndex) + } + + test("should get index metadata without log entry") { + val metadata = mock[FlintMetadata] + when(metadata.getContent).thenReturn("{}") + when(metadata.indexSettings).thenReturn(None) + flintClient.createIndex(testFlintIndex, metadata) + + flintClient.getIndexMetadata(testFlintIndex).latestLogEntry shouldBe empty + + deleteTestIndex(testFlintIndex) + } +} diff --git a/integ-test/src/test/scala/org/opensearch/flint/core/FlintTransactionITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/core/FlintTransactionITSuite.scala index 36cac027a..faff3341c 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/core/FlintTransactionITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/core/FlintTransactionITSuite.scala @@ -11,17 +11,13 @@ import scala.collection.JavaConverters.mapAsJavaMapConverter import org.json4s.{Formats, NoTypeHints} import org.json4s.native.{JsonMethods, Serialization} -import org.mockito.Mockito.when import org.opensearch.flint.OpenSearchTransactionSuite -import org.opensearch.flint.core.metadata.FlintMetadata import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState._ import org.opensearch.flint.core.metadata.log.FlintMetadataLogService -import org.opensearch.flint.core.storage.FlintOpenSearchClient import org.opensearch.flint.core.storage.FlintOpenSearchMetadataLogService import org.opensearch.index.seqno.SequenceNumbers.{UNASSIGNED_PRIMARY_TERM, UNASSIGNED_SEQ_NO} import org.scalatest.matchers.should.Matchers -import org.scalatestplus.mockito.MockitoSugar.mock import org.apache.spark.sql.flint.config.FlintSparkConf.DATA_SOURCE_NAME @@ -30,13 +26,11 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { val testFlintIndex = "flint_test_index" val testLatestId: String = Base64.getEncoder.encodeToString(testFlintIndex.getBytes) var flintMetadataLogService: FlintMetadataLogService = _ - var flintClient: FlintClient = _ override def beforeAll(): Unit = { super.beforeAll() val options = openSearchOptions + (DATA_SOURCE_NAME.key -> testDataSourceName) flintMetadataLogService = new FlintOpenSearchMetadataLogService(new FlintOptions(options.asJava)) - flintClient = new FlintOpenSearchClient(new FlintOptions(options.asJava), flintMetadataLogService) } test("empty metadata log entry content") { @@ -54,50 +48,6 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { .commit(_ => {}) } - test("get index metadata with latest log entry") { - val indexName = "test_latest_log_entry" - val latestId: String = Base64.getEncoder.encodeToString(indexName.getBytes) - val testCreateTime = 1234567890123L - val flintMetadataLogEntry = FlintMetadataLogEntry( - id = latestId, - seqNo = UNASSIGNED_SEQ_NO, - primaryTerm = UNASSIGNED_PRIMARY_TERM, - createTime = testCreateTime, - state = ACTIVE, - dataSource = testDataSourceName, - error = "") - val metadata = mock[FlintMetadata] - when(metadata.getContent).thenReturn("{}") - when(metadata.indexSettings).thenReturn(None) - when(metadata.latestLogEntry).thenReturn(Some(flintMetadataLogEntry)) - - flintClient.createIndex(indexName, metadata) - createLatestLogEntry(flintMetadataLogEntry) - - val latest = flintClient.getIndexMetadata(indexName).latestLogEntry - latest.isDefined shouldBe true - latest.get.id shouldBe latestId - latest.get.createTime shouldBe testCreateTime - latest.get.dataSource shouldBe testDataSourceName - latest.get.error shouldBe "" - - flintClient.deleteIndex(indexName) - flintClient.exists(indexName) shouldBe false - } - - test("get index metadata without log entry") { - val indexName = "empty_log_entry" - val metadata = mock[FlintMetadata] - when(metadata.getContent).thenReturn("{}") - when(metadata.indexSettings).thenReturn(None) - flintClient.createIndex(indexName, metadata) - - flintClient.getIndexMetadata(indexName).latestLogEntry shouldBe empty - - flintClient.deleteIndex(indexName) - flintClient.exists(indexName) shouldBe false - } - test("should preserve original values when transition") { val testCreateTime = 1234567890123L createLatestLogEntry(