Skip to content

Commit

Permalink
getAllIndexMetadata by pattern becomes optional (#682)
Browse files Browse the repository at this point in the history
Signed-off-by: Sean Kao <[email protected]>
(cherry picked from commit 05e7f23)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
github-actions[bot] committed Sep 24, 2024
1 parent 3e0c5f9 commit f3295f4
Show file tree
Hide file tree
Showing 8 changed files with 195 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,22 @@ public interface FlintIndexMetadataService {
*/
FlintMetadata getIndexMetadata(String indexName);

/**
* Whether the service supports retrieving metadata for Flint indexes by index pattern.
*
* @return true if supported, otherwise false
*/
boolean supportsGetByIndexPattern();

/**
* Retrieve all metadata for Flint index whose name matches the given pattern.
* If get by index pattern is not supported, then the provided names must be full index names.
*
* @param indexNamePattern index name pattern
* @return map where the keys are the matched index names, and the values are
* @param indexNamePatterns index full names or patterns
* @return map where the keys are the (matched) index names, and the values are
* corresponding index metadata
*/
Map<String, FlintMetadata> getAllIndexMetadata(String... indexNamePattern);
Map<String, FlintMetadata> getAllIndexMetadata(String... indexNamePatterns);

/**
* Update metadata for a Flint index.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import org.opensearch.flint.common.metadata.FlintMetadata;
import org.opensearch.flint.core.storage.FlintWriter;

import java.util.List;

/**
* Flint index client that provides API for metadata and data operations
* on a Flint index regardless of concrete storage.
Expand All @@ -30,6 +32,14 @@ public interface FlintClient {
*/
boolean exists(String indexName);

/**
* Get all index names that match the given pattern.
*
* @param indexNamePatterns index name patterns
* @return list of index names
*/
List<String> getIndexNames(String... indexNamePatterns);

/**
* Delete a Flint index.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.opensearch.client.RequestOptions;
import org.opensearch.client.indices.CreateIndexRequest;
import org.opensearch.client.indices.GetIndexRequest;
import org.opensearch.client.indices.GetIndexResponse;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.flint.common.metadata.FlintMetadata;
Expand All @@ -18,7 +19,10 @@
import scala.Option;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.logging.Logger;
import java.util.stream.Collectors;

/**
* Flint client implementation for OpenSearch storage.
Expand Down Expand Up @@ -65,6 +69,19 @@ public boolean exists(String indexName) {
}
}

@Override
public List<String> getIndexNames(String... indexNamePatterns) {
LOG.info("Getting Flint index names for pattern " + String.join(",", indexNamePatterns));
String[] osIndexNamePatterns = Arrays.stream(indexNamePatterns).map(this::sanitizeIndexName).toArray(String[]::new);
try (IRestHighLevelClient client = createClient()) {
GetIndexRequest request = new GetIndexRequest(osIndexNamePatterns);
GetIndexResponse response = client.getIndex(request, RequestOptions.DEFAULT);
return Arrays.stream(response.getIndices()).collect(Collectors.toList());
} catch (Exception e) {
throw new IllegalStateException("Failed to get Flint index names for pattern " + String.join(", ", indexNamePatterns), e);
}
}

@Override
public void deleteIndex(String indexName) {
LOG.info("Deleting Flint index " + indexName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,12 @@ class FlintOpenSearchIndexMetadataService(options: FlintOptions)
}
}

override def getAllIndexMetadata(indexNamePattern: String*): util.Map[String, FlintMetadata] = {
logInfo(s"Fetching all Flint index metadata for pattern ${indexNamePattern.mkString(",")}");
val indexNames = indexNamePattern.map(OpenSearchClientUtils.sanitizeIndexName)
override def supportsGetByIndexPattern(): Boolean = true

override def getAllIndexMetadata(
indexNamePatterns: String*): util.Map[String, FlintMetadata] = {
logInfo(s"Fetching all Flint index metadata for pattern ${indexNamePatterns.mkString(",")}");
val indexNames = indexNamePatterns.map(OpenSearchClientUtils.sanitizeIndexName)
var client: IRestHighLevelClient = null
try {
client = OpenSearchClientUtils.createClient(options)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
def describeIndexes(indexNamePattern: String): Seq[FlintSparkIndex] = {
logInfo(s"Describing indexes with pattern $indexNamePattern")
if (flintClient.exists(indexNamePattern)) {
flintIndexMetadataService
.getAllIndexMetadata(indexNamePattern)
.asScala
getAllIndexMetadata(indexNamePattern)
.map { case (indexName, metadata) =>
attachLatestLogEntry(indexName, metadata)
}
Expand Down Expand Up @@ -368,6 +366,21 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
}
}

private def getAllIndexMetadata(indexNamePattern: String): Map[String, FlintMetadata] = {
if (flintIndexMetadataService.supportsGetByIndexPattern) {
flintIndexMetadataService
.getAllIndexMetadata(indexNamePattern)
.asScala
.toMap
} else {
val indexNames = flintClient.getIndexNames(indexNamePattern).asScala.toArray
flintIndexMetadataService
.getAllIndexMetadata(indexNames: _*)
.asScala
.toMap
}
}

/**
* Attaches latest log entry to metadata if available.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,17 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M
(settings \ "index.number_of_replicas").extract[String] shouldBe "2"
}

it should "get all index names with the given index name pattern" in {
val metadata = FlintOpenSearchIndexMetadataService.deserialize(
"""{"properties": {"test": { "type": "integer" } } }""")
flintClient.createIndex("flint_test_1_index", metadata)
flintClient.createIndex("flint_test_2_index", metadata)

val indexNames = flintClient.getIndexNames("flint_*_index")
indexNames should have size 2
indexNames should contain allOf ("flint_test_1_index", "flint_test_2_index")
}

it should "convert index name to all lowercase" in {
val indexName = "flint_ELB_logs_index"
flintClient.createIndex(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,12 @@ class FlintOpenSearchIndexMetadataServiceITSuite
}

class TestIndexMetadataService extends FlintIndexMetadataService {
override def getIndexMetadata(indexName: String): FlintMetadata = {
null
}
override def getIndexMetadata(indexName: String): FlintMetadata = null

override def getAllIndexMetadata(indexNamePattern: String*): util.Map[String, FlintMetadata] = {
override def supportsGetByIndexPattern(): Boolean = true

override def getAllIndexMetadata(indexNamePattern: String*): util.Map[String, FlintMetadata] =
null
}

override def updateIndexMetadata(indexName: String, metadata: FlintMetadata): Unit = {}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark

import java.util

import scala.collection.JavaConverters.mapAsJavaMapConverter

import org.opensearch.flint.common.metadata.{FlintIndexMetadataService, FlintMetadata}
import org.opensearch.flint.core.storage.FlintOpenSearchIndexMetadataService
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex
import org.scalatest.matchers.should.Matchers

import org.apache.spark.internal.Logging
import org.apache.spark.sql.flint.config.FlintSparkConf

class FlintSparkIndexDescribeITSuite extends FlintSparkSuite with Matchers {

/** Test table and index name */
private val testTable = "spark_catalog.default.covering_sql_test"
private val testIndexMatch1 = "name_and_age_1"
private val testIndexMatch2 = "name_and_age_2"
private val testIndexOther = "address"
private val testFlintIndexMatch1 =
FlintSparkCoveringIndex.getFlintIndexName(testIndexMatch1, testTable)
private val testFlintIndexMatch2 =
FlintSparkCoveringIndex.getFlintIndexName(testIndexMatch2, testTable)
private val testFlintIndexOther =
FlintSparkCoveringIndex.getFlintIndexName(testIndexOther, testTable)

override def beforeEach(): Unit = {
super.beforeEach()

createPartitionedAddressTable(testTable)

flint
.coveringIndex()
.name(testIndexMatch1)
.onTable(testTable)
.addIndexColumns("name", "age")
.create()

flint
.coveringIndex()
.name(testIndexMatch2)
.onTable(testTable)
.addIndexColumns("name", "age")
.create()

flint
.coveringIndex()
.name(testIndexOther)
.onTable(testTable)
.addIndexColumns("address")
.create()
}

override def afterEach(): Unit = {
super.afterEach()

// Delete all test indices
deleteTestIndex(testFlintIndexMatch1, testFlintIndexMatch2, testFlintIndexOther)
sql(s"DROP TABLE $testTable")
}

test("describe all indexes matching a pattern") {
val indexNamePattern = FlintSparkCoveringIndex.getFlintIndexName("name_and_age_*", testTable)
val indexes = flint.describeIndexes(indexNamePattern)
indexes should have size 2
indexes.map(_.name) should contain allOf (testFlintIndexMatch1, testFlintIndexMatch2)
}

test(
"describe all indexes matching a pattern with custom index metadata service implementation without get-by-pattern support") {
setFlintSparkConf(
FlintSparkConf.CUSTOM_FLINT_INDEX_METADATA_SERVICE_CLASS,
classOf[NoGetByPatternSupportIndexMetadataService].getName)
val testFlint = new FlintSpark(spark)
val indexNamePattern = FlintSparkCoveringIndex.getFlintIndexName("name_and_age_*", testTable)
val indexes = testFlint.describeIndexes(indexNamePattern)
indexes should have size 2
indexes.map(_.name) should contain allOf (testFlintIndexMatch1, testFlintIndexMatch2)
}
}

class NoGetByPatternSupportIndexMetadataService extends FlintIndexMetadataService with Logging {

/**
* Cannot directly extend FlintOpenSearchIndexMetadataService because
* FlintIndexMetadataServiceBuilder expects custom implementation takes no arguments in its
* constructor
*/
private val indexMetadataService = new FlintOpenSearchIndexMetadataService(
FlintSparkConf().flintOptions())

override def supportsGetByIndexPattern(): Boolean = false

/**
* Does not match index names by pattern. The input is expected to be a list of full index names
*/
override def getAllIndexMetadata(indexNames: String*): util.Map[String, FlintMetadata] = {
logInfo(s"Fetching all Flint index metadata for indexes ${indexNames.mkString(",")}");
indexNames
.map(index => index -> getIndexMetadata(index))
.toMap
.asJava
}

override def getIndexMetadata(indexName: String): FlintMetadata =
indexMetadataService.getIndexMetadata(indexName)

override def updateIndexMetadata(indexName: String, metadata: FlintMetadata): Unit =
indexMetadataService.updateIndexMetadata(indexName, metadata)

override def deleteIndexMetadata(indexName: String): Unit =
indexMetadataService.deleteIndexMetadata(indexName)
}

0 comments on commit f3295f4

Please sign in to comment.