From 32f581bfae65dcb6f52d41430aa6fd70fad55423 Mon Sep 17 00:00:00 2001 From: Sean Kao Date: Fri, 14 Jun 2024 12:09:39 -0700 Subject: [PATCH] rm flint os client dep on metadata log service Signed-off-by: Sean Kao --- .../opensearch/flint/core/FlintClient.java | 8 ++-- .../flint/core/FlintClientBuilder.java | 3 +- .../core/storage/FlintOpenSearchClient.java | 42 +++++-------------- .../opensearch/flint/spark/FlintSpark.scala | 31 +++++++++++++- .../flint/core/FlintMetadataLogITSuite.scala | 33 --------------- .../core/FlintOpenSearchClientSuite.scala | 4 +- 6 files changed, 47 insertions(+), 74 deletions(-) diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java index c9d0cad83..b9ef05851 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java @@ -5,10 +5,9 @@ package org.opensearch.flint.core; -import java.util.List; +import java.util.Map; import org.opensearch.flint.core.metadata.FlintMetadata; -import org.opensearch.flint.core.metadata.log.OptimisticTransaction; import org.opensearch.flint.core.storage.FlintReader; import org.opensearch.flint.core.storage.FlintWriter; @@ -38,9 +37,10 @@ public interface FlintClient { * Retrieve all metadata for Flint index whose name matches the given pattern. * * @param indexNamePattern index name pattern - * @return all matched index metadata + * @return map where the keys are the matched index names, and the values are + * corresponding index metadata */ - List getAllIndexMetadata(String indexNamePattern); + Map getAllIndexMetadata(String indexNamePattern); /** * Retrieve metadata in a Flint index. diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/FlintClientBuilder.java b/flint-core/src/main/scala/org/opensearch/flint/core/FlintClientBuilder.java index 006770581..a0372a86f 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/FlintClientBuilder.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/FlintClientBuilder.java @@ -6,7 +6,6 @@ package org.opensearch.flint.core; import org.opensearch.flint.core.storage.FlintOpenSearchClient; -import org.opensearch.flint.core.storage.FlintOpenSearchMetadataLogService; /** * {@link FlintClient} builder. @@ -14,6 +13,6 @@ public class FlintClientBuilder { public static FlintClient build(FlintOptions options) { - return new FlintOpenSearchClient(options, new FlintOpenSearchMetadataLogService(options)); + return new FlintOpenSearchClient(options); } } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java index fe613d048..36db4a040 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java @@ -10,10 +10,9 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.Objects; -import java.util.Optional; import java.util.Set; import java.util.logging.Logger; import java.util.stream.Collectors; @@ -33,10 +32,6 @@ import org.opensearch.flint.core.FlintOptions; import org.opensearch.flint.core.IRestHighLevelClient; import org.opensearch.flint.core.metadata.FlintMetadata; -import org.opensearch.flint.core.metadata.log.FlintMetadataLog; -import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry; -import org.opensearch.flint.core.metadata.log.FlintMetadataLogService; -import org.opensearch.flint.core.metadata.log.OptimisticTransaction; import org.opensearch.index.query.AbstractQueryBuilder; import org.opensearch.index.query.MatchAllQueryBuilder; import org.opensearch.index.query.QueryBuilder; @@ -68,15 +63,9 @@ public class FlintOpenSearchClient implements FlintClient { Set.of(' ', ',', ':', '"', '+', '/', '\\', '|', '?', '#', '>', '<'); private final FlintOptions options; - private final FlintMetadataLogService metadataLogService; - - public FlintOpenSearchClient(FlintOptions options, FlintMetadataLogService metadataLogService) { - this.options = options; - this.metadataLogService = metadataLogService; - } public FlintOpenSearchClient(FlintOptions options) { - this(options, new FlintOpenSearchMetadataLogService(options)); + this.options = options; } @Override @@ -112,7 +101,7 @@ public boolean exists(String indexName) { } @Override - public List getAllIndexMetadata(String indexNamePattern) { + public Map getAllIndexMetadata(String indexNamePattern) { LOG.info("Fetching all Flint index metadata for pattern " + indexNamePattern); String osIndexNamePattern = sanitizeIndexName(indexNamePattern); try (IRestHighLevelClient client = createClient()) { @@ -120,11 +109,13 @@ public List getAllIndexMetadata(String indexNamePattern) { GetIndexResponse response = client.getIndex(request, RequestOptions.DEFAULT); return Arrays.stream(response.getIndices()) - .map(index -> constructFlintMetadata( - index, - response.getMappings().get(index).source().toString(), - response.getSettings().get(index).toString())) - .collect(Collectors.toList()); + .collect(Collectors.toMap( + index -> index, + index -> FlintMetadata.apply( + response.getMappings().get(index).source().toString(), + response.getSettings().get(index).toString() + ) + )); } catch (Exception e) { throw new IllegalStateException("Failed to get Flint index metadata for " + osIndexNamePattern, e); } @@ -140,7 +131,7 @@ public FlintMetadata getIndexMetadata(String indexName) { MappingMetadata mapping = response.getMappings().get(osIndexName); Settings settings = response.getSettings().get(osIndexName); - return constructFlintMetadata(indexName, mapping.source().string(), settings.toString()); + return FlintMetadata.apply(mapping.source().string(), settings.toString()); } catch (Exception e) { throw new IllegalStateException("Failed to get Flint index metadata for " + osIndexName, e); } @@ -210,17 +201,6 @@ public IRestHighLevelClient createClient() { return OpenSearchClientUtils.createClient(options); } - /* - * Constructs Flint metadata with latest metadata log entry attached if it's available. - */ - private FlintMetadata constructFlintMetadata(String indexName, String mapping, String settings) { - Optional latest = metadataLogService.getIndexMetadataLog(indexName) - .flatMap(FlintMetadataLog::getLatest); - return latest - .map(entry -> FlintMetadata.apply(mapping, settings, entry)) - .orElseGet(() -> FlintMetadata.apply(mapping, settings)); - } - /* * Because OpenSearch requires all lowercase letters in index name, we have to * lowercase all letters in the given Flint index name. diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index d47d4cf51..712abd3af 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -10,7 +10,8 @@ import scala.collection.JavaConverters._ import org.json4s.{Formats, NoTypeHints} import org.json4s.native.Serialization import org.opensearch.flint.core.{FlintClient, FlintClientBuilder} -import org.opensearch.flint.core.metadata.log.{FlintMetadataLogService, FlintMetadataLogServiceBuilder} +import org.opensearch.flint.core.metadata.FlintMetadata +import org.opensearch.flint.core.metadata.log.{FlintMetadataLog, FlintMetadataLogEntry, FlintMetadataLogService, FlintMetadataLogServiceBuilder} import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState._ import org.opensearch.flint.core.metadata.log.OptimisticTransaction.NO_LOG_ENTRY import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN @@ -176,6 +177,10 @@ class FlintSpark(val spark: SparkSession) extends Logging { flintClient .getAllIndexMetadata(indexNamePattern) .asScala + .map { case (indexName, metadata) => + attachLatestLogEntry(indexName, metadata) + } + .toList .flatMap(FlintSparkIndexFactory.create) } else { Seq.empty @@ -194,7 +199,8 @@ class FlintSpark(val spark: SparkSession) extends Logging { logInfo(s"Describing index name $indexName") if (flintClient.exists(indexName)) { val metadata = flintClient.getIndexMetadata(indexName) - FlintSparkIndexFactory.create(metadata) + val metadataWithEntry = attachLatestLogEntry(indexName, metadata) + FlintSparkIndexFactory.create(metadataWithEntry) } else { Option.empty } @@ -386,6 +392,27 @@ class FlintSpark(val spark: SparkSession) extends Logging { } } + /** + * Attaches latest log entry to metadata if available. + * + * @param indexName + * index name + * @param metadata + * base flint metadata + * @return + * flint metadata with latest log entry attached if available + */ + private def attachLatestLogEntry(indexName: String, metadata: FlintMetadata): FlintMetadata = { + val latest = flintMetadataLogService + .getIndexMetadataLog(indexName) + .flatMap(_.getLatest) + if (latest.isPresent) { + metadata.copy(latestLogEntry = Some(latest.get())) + } else { + metadata + } + } + /** * Validate the index update options are allowed. * @param originalOptions diff --git a/integ-test/src/test/scala/org/opensearch/flint/core/FlintMetadataLogITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/core/FlintMetadataLogITSuite.scala index b49622f18..1c5d60f15 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/core/FlintMetadataLogITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/core/FlintMetadataLogITSuite.scala @@ -9,9 +9,7 @@ import java.util.Base64 import scala.collection.JavaConverters._ -import org.mockito.Mockito.when import org.opensearch.flint.OpenSearchTransactionSuite -import org.opensearch.flint.core.metadata.FlintMetadata import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState._ import org.opensearch.flint.core.metadata.log.FlintMetadataLogService @@ -19,7 +17,6 @@ import org.opensearch.flint.core.storage.FlintOpenSearchClient import org.opensearch.flint.core.storage.FlintOpenSearchMetadataLogService import org.opensearch.index.seqno.SequenceNumbers.{UNASSIGNED_PRIMARY_TERM, UNASSIGNED_SEQ_NO} import org.scalatest.matchers.should.Matchers -import org.scalatestplus.mockito.MockitoSugar.mock import org.apache.spark.sql.flint.config.FlintSparkConf.DATA_SOURCE_NAME @@ -92,34 +89,4 @@ class FlintMetadataLogITSuite extends OpenSearchTransactionSuite with Matchers { val metadataLog = flintMetadataLogService.getIndexMetadataLog(testFlintIndex, true) metadataLog.isPresent shouldBe true } - - test("should get index metadata with latest log entry") { - val metadata = mock[FlintMetadata] - when(metadata.getContent).thenReturn("{}") - when(metadata.indexSettings).thenReturn(None) - when(metadata.latestLogEntry).thenReturn(Some(flintMetadataLogEntry)) - - flintClient.createIndex(testFlintIndex, metadata) - createLatestLogEntry(flintMetadataLogEntry) - - val latest = flintClient.getIndexMetadata(testFlintIndex).latestLogEntry - latest.isDefined shouldBe true - latest.get.id shouldBe testLatestId - latest.get.createTime shouldBe testCreateTime - latest.get.dataSource shouldBe testDataSourceName - latest.get.error shouldBe "" - - deleteTestIndex(testFlintIndex) - } - - test("should get index metadata without log entry") { - val metadata = mock[FlintMetadata] - when(metadata.getContent).thenReturn("{}") - when(metadata.indexSettings).thenReturn(None) - flintClient.createIndex(testFlintIndex, metadata) - - flintClient.getIndexMetadata(testFlintIndex).latestLogEntry shouldBe empty - - deleteTestIndex(testFlintIndex) - } } diff --git a/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala index e245c43c7..1373654aa 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala @@ -124,8 +124,8 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M val allMetadata = flintClient.getAllIndexMetadata("flint_*_index") allMetadata should have size 2 - allMetadata.forEach(metadata => metadata.getContent should not be empty) - allMetadata.forEach(metadata => metadata.indexSettings should not be empty) + allMetadata.values.forEach(metadata => metadata.getContent should not be empty) + allMetadata.values.forEach(metadata => metadata.indexSettings should not be empty) } it should "convert index name to all lowercase" in {