From c7b7028c6113e7a737f288c1ca0776ad291e287c Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Mon, 23 Sep 2024 21:50:20 +0000 Subject: [PATCH] getAllIndexMetadata by pattern becomes optional (#682) Signed-off-by: Sean Kao (cherry picked from commit 05e7f237082fef5db79d3d994fb46b7cf9595469) Signed-off-by: github-actions[bot] --- .../metadata/FlintIndexMetadataService.java | 14 +- .../opensearch/flint/core/FlintClient.java | 10 ++ .../core/storage/FlintOpenSearchClient.java | 17 +++ .../FlintOpenSearchIndexMetadataService.scala | 9 +- .../opensearch/flint/spark/FlintSpark.scala | 19 ++- .../core/FlintOpenSearchClientSuite.scala | 11 ++ ...penSearchIndexMetadataServiceITSuite.scala | 9 +- .../FlintSparkIndexDescribeITSuite.scala | 120 ++++++++++++++++++ 8 files changed, 195 insertions(+), 14 deletions(-) create mode 100644 integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkIndexDescribeITSuite.scala diff --git a/flint-commons/src/main/scala/org/opensearch/flint/common/metadata/FlintIndexMetadataService.java b/flint-commons/src/main/scala/org/opensearch/flint/common/metadata/FlintIndexMetadataService.java index b990998a9..a31fc9a78 100644 --- a/flint-commons/src/main/scala/org/opensearch/flint/common/metadata/FlintIndexMetadataService.java +++ b/flint-commons/src/main/scala/org/opensearch/flint/common/metadata/FlintIndexMetadataService.java @@ -25,14 +25,22 @@ public interface FlintIndexMetadataService { */ FlintMetadata getIndexMetadata(String indexName); + /** + * Whether the service supports retrieving metadata for Flint indexes by index pattern. + * + * @return true if supported, otherwise false + */ + boolean supportsGetByIndexPattern(); + /** * Retrieve all metadata for Flint index whose name matches the given pattern. + * If get by index pattern is not supported, then the provided names must be full index names. * - * @param indexNamePattern index name pattern - * @return map where the keys are the matched index names, and the values are + * @param indexNamePatterns index full names or patterns + * @return map where the keys are the (matched) index names, and the values are * corresponding index metadata */ - Map getAllIndexMetadata(String... indexNamePattern); + Map getAllIndexMetadata(String... indexNamePatterns); /** * Update metadata for a Flint index. diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java index 29b5f6de9..6ce344c3f 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java @@ -8,6 +8,8 @@ import org.opensearch.flint.common.metadata.FlintMetadata; import org.opensearch.flint.core.storage.FlintWriter; +import java.util.List; + /** * Flint index client that provides API for metadata and data operations * on a Flint index regardless of concrete storage. @@ -30,6 +32,14 @@ public interface FlintClient { */ boolean exists(String indexName); + /** + * Get all index names that match the given pattern. + * + * @param indexNamePatterns index name patterns + * @return list of index names + */ + List getIndexNames(String... indexNamePatterns); + /** * Delete a Flint index. * diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java index ef97f65ac..da22e3751 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java @@ -9,6 +9,7 @@ import org.opensearch.client.RequestOptions; import org.opensearch.client.indices.CreateIndexRequest; import org.opensearch.client.indices.GetIndexRequest; +import org.opensearch.client.indices.GetIndexResponse; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.xcontent.XContentType; import org.opensearch.flint.common.metadata.FlintMetadata; @@ -18,7 +19,10 @@ import scala.Option; import java.io.IOException; +import java.util.Arrays; +import java.util.List; import java.util.logging.Logger; +import java.util.stream.Collectors; /** * Flint client implementation for OpenSearch storage. @@ -65,6 +69,19 @@ public boolean exists(String indexName) { } } + @Override + public List getIndexNames(String... indexNamePatterns) { + LOG.info("Getting Flint index names for pattern " + String.join(",", indexNamePatterns)); + String[] osIndexNamePatterns = Arrays.stream(indexNamePatterns).map(this::sanitizeIndexName).toArray(String[]::new); + try (IRestHighLevelClient client = createClient()) { + GetIndexRequest request = new GetIndexRequest(osIndexNamePatterns); + GetIndexResponse response = client.getIndex(request, RequestOptions.DEFAULT); + return Arrays.stream(response.getIndices()).collect(Collectors.toList()); + } catch (Exception e) { + throw new IllegalStateException("Failed to get Flint index names for pattern " + String.join(", ", indexNamePatterns), e); + } + } + @Override public void deleteIndex(String indexName) { LOG.info("Deleting Flint index " + indexName); diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchIndexMetadataService.scala b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchIndexMetadataService.scala index fad2f1b63..765460da7 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchIndexMetadataService.scala +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchIndexMetadataService.scala @@ -46,9 +46,12 @@ class FlintOpenSearchIndexMetadataService(options: FlintOptions) } } - override def getAllIndexMetadata(indexNamePattern: String*): util.Map[String, FlintMetadata] = { - logInfo(s"Fetching all Flint index metadata for pattern ${indexNamePattern.mkString(",")}"); - val indexNames = indexNamePattern.map(OpenSearchClientUtils.sanitizeIndexName) + override def supportsGetByIndexPattern(): Boolean = true + + override def getAllIndexMetadata( + indexNamePatterns: String*): util.Map[String, FlintMetadata] = { + logInfo(s"Fetching all Flint index metadata for pattern ${indexNamePatterns.mkString(",")}"); + val indexNames = indexNamePatterns.map(OpenSearchClientUtils.sanitizeIndexName) var client: IRestHighLevelClient = null try { client = OpenSearchClientUtils.createClient(options) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 88b8c38cc..fa0835d86 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -168,9 +168,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w def describeIndexes(indexNamePattern: String): Seq[FlintSparkIndex] = { logInfo(s"Describing indexes with pattern $indexNamePattern") if (flintClient.exists(indexNamePattern)) { - flintIndexMetadataService - .getAllIndexMetadata(indexNamePattern) - .asScala + getAllIndexMetadata(indexNamePattern) .map { case (indexName, metadata) => attachLatestLogEntry(indexName, metadata) } @@ -368,6 +366,21 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w } } + private def getAllIndexMetadata(indexNamePattern: String): Map[String, FlintMetadata] = { + if (flintIndexMetadataService.supportsGetByIndexPattern) { + flintIndexMetadataService + .getAllIndexMetadata(indexNamePattern) + .asScala + .toMap + } else { + val indexNames = flintClient.getIndexNames(indexNamePattern).asScala.toArray + flintIndexMetadataService + .getAllIndexMetadata(indexNames: _*) + .asScala + .toMap + } + } + /** * Attaches latest log entry to metadata if available. * diff --git a/integ-test/src/integration/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala index 2dc6016b2..a2c2d26f6 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala @@ -65,6 +65,17 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M (settings \ "index.number_of_replicas").extract[String] shouldBe "2" } + it should "get all index names with the given index name pattern" in { + val metadata = FlintOpenSearchIndexMetadataService.deserialize( + """{"properties": {"test": { "type": "integer" } } }""") + flintClient.createIndex("flint_test_1_index", metadata) + flintClient.createIndex("flint_test_2_index", metadata) + + val indexNames = flintClient.getIndexNames("flint_*_index") + indexNames should have size 2 + indexNames should contain allOf ("flint_test_1_index", "flint_test_2_index") + } + it should "convert index name to all lowercase" in { val indexName = "flint_ELB_logs_index" flintClient.createIndex( diff --git a/integ-test/src/integration/scala/org/opensearch/flint/core/FlintOpenSearchIndexMetadataServiceITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/core/FlintOpenSearchIndexMetadataServiceITSuite.scala index c5bd75951..c7256013c 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/core/FlintOpenSearchIndexMetadataServiceITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/core/FlintOpenSearchIndexMetadataServiceITSuite.scala @@ -108,13 +108,12 @@ class FlintOpenSearchIndexMetadataServiceITSuite } class TestIndexMetadataService extends FlintIndexMetadataService { - override def getIndexMetadata(indexName: String): FlintMetadata = { - null - } + override def getIndexMetadata(indexName: String): FlintMetadata = null - override def getAllIndexMetadata(indexNamePattern: String*): util.Map[String, FlintMetadata] = { + override def supportsGetByIndexPattern(): Boolean = true + + override def getAllIndexMetadata(indexNamePattern: String*): util.Map[String, FlintMetadata] = null - } override def updateIndexMetadata(indexName: String, metadata: FlintMetadata): Unit = {} diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkIndexDescribeITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkIndexDescribeITSuite.scala new file mode 100644 index 000000000..594765b8a --- /dev/null +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkIndexDescribeITSuite.scala @@ -0,0 +1,120 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark + +import java.util + +import scala.collection.JavaConverters.mapAsJavaMapConverter + +import org.opensearch.flint.common.metadata.{FlintIndexMetadataService, FlintMetadata} +import org.opensearch.flint.core.storage.FlintOpenSearchIndexMetadataService +import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex +import org.scalatest.matchers.should.Matchers + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.flint.config.FlintSparkConf + +class FlintSparkIndexDescribeITSuite extends FlintSparkSuite with Matchers { + + /** Test table and index name */ + private val testTable = "spark_catalog.default.covering_sql_test" + private val testIndexMatch1 = "name_and_age_1" + private val testIndexMatch2 = "name_and_age_2" + private val testIndexOther = "address" + private val testFlintIndexMatch1 = + FlintSparkCoveringIndex.getFlintIndexName(testIndexMatch1, testTable) + private val testFlintIndexMatch2 = + FlintSparkCoveringIndex.getFlintIndexName(testIndexMatch2, testTable) + private val testFlintIndexOther = + FlintSparkCoveringIndex.getFlintIndexName(testIndexOther, testTable) + + override def beforeEach(): Unit = { + super.beforeEach() + + createPartitionedAddressTable(testTable) + + flint + .coveringIndex() + .name(testIndexMatch1) + .onTable(testTable) + .addIndexColumns("name", "age") + .create() + + flint + .coveringIndex() + .name(testIndexMatch2) + .onTable(testTable) + .addIndexColumns("name", "age") + .create() + + flint + .coveringIndex() + .name(testIndexOther) + .onTable(testTable) + .addIndexColumns("address") + .create() + } + + override def afterEach(): Unit = { + super.afterEach() + + // Delete all test indices + deleteTestIndex(testFlintIndexMatch1, testFlintIndexMatch2, testFlintIndexOther) + sql(s"DROP TABLE $testTable") + } + + test("describe all indexes matching a pattern") { + val indexNamePattern = FlintSparkCoveringIndex.getFlintIndexName("name_and_age_*", testTable) + val indexes = flint.describeIndexes(indexNamePattern) + indexes should have size 2 + indexes.map(_.name) should contain allOf (testFlintIndexMatch1, testFlintIndexMatch2) + } + + test( + "describe all indexes matching a pattern with custom index metadata service implementation without get-by-pattern support") { + setFlintSparkConf( + FlintSparkConf.CUSTOM_FLINT_INDEX_METADATA_SERVICE_CLASS, + classOf[NoGetByPatternSupportIndexMetadataService].getName) + val testFlint = new FlintSpark(spark) + val indexNamePattern = FlintSparkCoveringIndex.getFlintIndexName("name_and_age_*", testTable) + val indexes = testFlint.describeIndexes(indexNamePattern) + indexes should have size 2 + indexes.map(_.name) should contain allOf (testFlintIndexMatch1, testFlintIndexMatch2) + } +} + +class NoGetByPatternSupportIndexMetadataService extends FlintIndexMetadataService with Logging { + + /** + * Cannot directly extend FlintOpenSearchIndexMetadataService because + * FlintIndexMetadataServiceBuilder expects custom implementation takes no arguments in its + * constructor + */ + private val indexMetadataService = new FlintOpenSearchIndexMetadataService( + FlintSparkConf().flintOptions()) + + override def supportsGetByIndexPattern(): Boolean = false + + /** + * Does not match index names by pattern. The input is expected to be a list of full index names + */ + override def getAllIndexMetadata(indexNames: String*): util.Map[String, FlintMetadata] = { + logInfo(s"Fetching all Flint index metadata for indexes ${indexNames.mkString(",")}"); + indexNames + .map(index => index -> getIndexMetadata(index)) + .toMap + .asJava + } + + override def getIndexMetadata(indexName: String): FlintMetadata = + indexMetadataService.getIndexMetadata(indexName) + + override def updateIndexMetadata(indexName: String, metadata: FlintMetadata): Unit = + indexMetadataService.updateIndexMetadata(indexName, metadata) + + override def deleteIndexMetadata(indexName: String): Unit = + indexMetadataService.deleteIndexMetadata(indexName) +}