From 0933f916cb3ce874fa995bff63767baf68cfe912 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Wed, 4 Oct 2023 12:18:10 -0700 Subject: [PATCH] Lowercase index name in Flint client Signed-off-by: Chen Dai --- .../core/storage/FlintOpenSearchClient.java | 47 +++++++---- .../spark/FlintSparkIndexNameITSuite.scala | 81 +++++++++++++++++++ 2 files changed, 112 insertions(+), 16 deletions(-) create mode 100644 integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexNameITSuite.scala diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java index b35406707..b973385d8 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java @@ -15,6 +15,8 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Locale; +import java.util.Objects; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import org.apache.http.HttpHost; @@ -30,8 +32,6 @@ import org.opensearch.client.indices.CreateIndexRequest; import org.opensearch.client.indices.GetIndexRequest; import org.opensearch.client.indices.GetIndexResponse; -import org.opensearch.client.indices.GetMappingsRequest; -import org.opensearch.client.indices.GetMappingsResponse; import org.opensearch.cluster.metadata.MappingMetadata; import org.opensearch.common.Strings; import org.opensearch.common.settings.Settings; @@ -68,8 +68,9 @@ public FlintOpenSearchClient(FlintOptions options) { } @Override public void createIndex(String indexName, FlintMetadata metadata) { + String osIndexName = toLowercase(indexName); try (RestHighLevelClient client = createClient()) { - CreateIndexRequest request = new CreateIndexRequest(indexName); + CreateIndexRequest request = new CreateIndexRequest(osIndexName); request.mapping(metadata.getContent(), XContentType.JSON); if (metadata.getIndexSettings() != null) { @@ -77,21 +78,23 @@ public FlintOpenSearchClient(FlintOptions options) { } client.indices().create(request, RequestOptions.DEFAULT); } catch (Exception e) { - throw new IllegalStateException("Failed to create Flint index " + indexName, e); + throw new IllegalStateException("Failed to create Flint index " + osIndexName, e); } } @Override public boolean exists(String indexName) { + String osIndexName = toLowercase(indexName); try (RestHighLevelClient client = createClient()) { - return client.indices().exists(new GetIndexRequest(indexName), RequestOptions.DEFAULT); + return client.indices().exists(new GetIndexRequest(osIndexName), RequestOptions.DEFAULT); } catch (IOException e) { - throw new IllegalStateException("Failed to check if Flint index exists " + indexName, e); + throw new IllegalStateException("Failed to check if Flint index exists " + osIndexName, e); } } @Override public List getAllIndexMetadata(String indexNamePattern) { + String osIndexNamePattern = toLowercase(indexNamePattern); try (RestHighLevelClient client = createClient()) { - GetIndexRequest request = new GetIndexRequest(indexNamePattern); + GetIndexRequest request = new GetIndexRequest(osIndexNamePattern); GetIndexResponse response = client.indices().get(request, RequestOptions.DEFAULT); return Arrays.stream(response.getIndices()) @@ -100,30 +103,32 @@ public FlintOpenSearchClient(FlintOptions options) { response.getSettings().get(index).toString())) .collect(Collectors.toList()); } catch (Exception e) { - throw new IllegalStateException("Failed to get Flint index metadata for " + indexNamePattern, e); + throw new IllegalStateException("Failed to get Flint index metadata for " + osIndexNamePattern, e); } } @Override public FlintMetadata getIndexMetadata(String indexName) { + String osIndexName = toLowercase(indexName); try (RestHighLevelClient client = createClient()) { - GetIndexRequest request = new GetIndexRequest(indexName); + GetIndexRequest request = new GetIndexRequest(osIndexName); GetIndexResponse response = client.indices().get(request, RequestOptions.DEFAULT); - MappingMetadata mapping = response.getMappings().get(indexName); - Settings settings = response.getSettings().get(indexName); + MappingMetadata mapping = response.getMappings().get(osIndexName); + Settings settings = response.getSettings().get(osIndexName); return new FlintMetadata(mapping.source().string(), settings.toString()); } catch (Exception e) { - throw new IllegalStateException("Failed to get Flint index metadata for " + indexName, e); + throw new IllegalStateException("Failed to get Flint index metadata for " + osIndexName, e); } } @Override public void deleteIndex(String indexName) { + String osIndexName = toLowercase(indexName); try (RestHighLevelClient client = createClient()) { - DeleteIndexRequest request = new DeleteIndexRequest(indexName); + DeleteIndexRequest request = new DeleteIndexRequest(osIndexName); client.indices().delete(request, RequestOptions.DEFAULT); } catch (Exception e) { - throw new IllegalStateException("Failed to delete Flint index " + indexName, e); + throw new IllegalStateException("Failed to delete Flint index " + osIndexName, e); } } @@ -144,7 +149,7 @@ public FlintOpenSearchClient(FlintOptions options) { queryBuilder = AbstractQueryBuilder.parseInnerQueryBuilder(parser); } return new OpenSearchScrollReader(createClient(), - indexName, + toLowercase(indexName), new SearchSourceBuilder().query(queryBuilder), options); } catch (IOException e) { @@ -153,7 +158,7 @@ public FlintOpenSearchClient(FlintOptions options) { } public FlintWriter createWriter(String indexName) { - return new OpenSearchWriter(createClient(), indexName, options.getRefreshPolicy()); + return new OpenSearchWriter(createClient(), toLowercase(indexName), options.getRefreshPolicy()); } private RestHighLevelClient createClient() { @@ -194,4 +199,14 @@ private RestHighLevelClient createClient() { } return new RestHighLevelClient(restClientBuilder); } + + /* + * Because OpenSearch requires all lowercase letters in index name, we have to + * lowercase all letters in the given Flint index name. + */ + private String toLowercase(String indexName) { + Objects.requireNonNull(indexName); + + return indexName.toLowerCase(Locale.ROOT); + } } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexNameITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexNameITSuite.scala new file mode 100644 index 000000000..5b47edc46 --- /dev/null +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexNameITSuite.scala @@ -0,0 +1,81 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark + +import scala.Option.empty + +import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.getFlintIndexName +import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName +import org.scalatest.matchers.must.Matchers.have +import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper + +import org.apache.spark.sql.Row + +class FlintSparkIndexNameITSuite extends FlintSparkSuite { + + /** Test table that has table name and column name with uppercase letter */ + private val testTable = "spark_catalog.default.Test" + + override def beforeAll(): Unit = { + super.beforeAll() + + sql(s""" + | CREATE TABLE $testTable + | ( + | Name STRING + | ) + | USING CSV + | OPTIONS ( + | header 'false', + | delimiter '\t' + | ) + |""".stripMargin) + + sql(s""" + | INSERT INTO $testTable + | VALUES ('Hello') + | """.stripMargin) + } + + test("skipping index with table and column name with uppercase letter") { + sql(s""" + | CREATE SKIPPING INDEX ON $testTable + | ( Name VALUE_SET) + |""".stripMargin) + + checkAnswer( + sql(s"DESC SKIPPING INDEX ON $testTable"), + Seq(Row("Name", "string", "VALUE_SET"))) + + sql(s"REFRESH SKIPPING INDEX ON $testTable") + val flintIndexName = getSkippingIndexName(testTable) + val indexData = flint.queryIndex(flintIndexName).collect().toSet + indexData should have size 1 + + sql(s"DROP SKIPPING INDEX ON $testTable") + flint.describeIndex(flintIndexName) shouldBe empty + } + + test("covering index with index, table and column name with uppercase letter") { + val testIndex = "Idx_Name" + sql(s""" + | CREATE INDEX $testIndex ON $testTable (Name) + |""".stripMargin) + + checkAnswer(sql(s"SHOW INDEX ON $testTable"), Seq(Row(testIndex))) + checkAnswer( + sql(s"DESC INDEX $testIndex ON $testTable"), + Seq(Row("Name", "string", "indexed"))) + + sql(s"REFRESH INDEX $testIndex ON $testTable") + val flintIndexName = getFlintIndexName(testIndex, testTable) + val indexData = flint.queryIndex(flintIndexName).collect().toSet + indexData should have size 1 + + sql(s"DROP INDEX $testIndex ON $testTable") + flint.describeIndex(flintIndexName) shouldBe empty + } +}