Skip to content

Commit

Permalink
Lowercase index name in Flint client
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Oct 4, 2023
1 parent 6d87b81 commit 0933f91
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.http.HttpHost;
Expand All @@ -30,8 +32,6 @@
import org.opensearch.client.indices.CreateIndexRequest;
import org.opensearch.client.indices.GetIndexRequest;
import org.opensearch.client.indices.GetIndexResponse;
import org.opensearch.client.indices.GetMappingsRequest;
import org.opensearch.client.indices.GetMappingsResponse;
import org.opensearch.cluster.metadata.MappingMetadata;
import org.opensearch.common.Strings;
import org.opensearch.common.settings.Settings;
Expand Down Expand Up @@ -68,30 +68,33 @@ public FlintOpenSearchClient(FlintOptions options) {
}

@Override public void createIndex(String indexName, FlintMetadata metadata) {
String osIndexName = toLowercase(indexName);
try (RestHighLevelClient client = createClient()) {
CreateIndexRequest request = new CreateIndexRequest(indexName);
CreateIndexRequest request = new CreateIndexRequest(osIndexName);
request.mapping(metadata.getContent(), XContentType.JSON);

if (metadata.getIndexSettings() != null) {
request.settings(metadata.getIndexSettings(), XContentType.JSON);
}
client.indices().create(request, RequestOptions.DEFAULT);
} catch (Exception e) {
throw new IllegalStateException("Failed to create Flint index " + indexName, e);
throw new IllegalStateException("Failed to create Flint index " + osIndexName, e);
}
}

@Override public boolean exists(String indexName) {
String osIndexName = toLowercase(indexName);
try (RestHighLevelClient client = createClient()) {
return client.indices().exists(new GetIndexRequest(indexName), RequestOptions.DEFAULT);
return client.indices().exists(new GetIndexRequest(osIndexName), RequestOptions.DEFAULT);
} catch (IOException e) {
throw new IllegalStateException("Failed to check if Flint index exists " + indexName, e);
throw new IllegalStateException("Failed to check if Flint index exists " + osIndexName, e);
}
}

@Override public List<FlintMetadata> getAllIndexMetadata(String indexNamePattern) {
String osIndexNamePattern = toLowercase(indexNamePattern);
try (RestHighLevelClient client = createClient()) {
GetIndexRequest request = new GetIndexRequest(indexNamePattern);
GetIndexRequest request = new GetIndexRequest(osIndexNamePattern);
GetIndexResponse response = client.indices().get(request, RequestOptions.DEFAULT);

return Arrays.stream(response.getIndices())
Expand All @@ -100,30 +103,32 @@ public FlintOpenSearchClient(FlintOptions options) {
response.getSettings().get(index).toString()))
.collect(Collectors.toList());
} catch (Exception e) {
throw new IllegalStateException("Failed to get Flint index metadata for " + indexNamePattern, e);
throw new IllegalStateException("Failed to get Flint index metadata for " + osIndexNamePattern, e);
}
}

@Override public FlintMetadata getIndexMetadata(String indexName) {
String osIndexName = toLowercase(indexName);
try (RestHighLevelClient client = createClient()) {
GetIndexRequest request = new GetIndexRequest(indexName);
GetIndexRequest request = new GetIndexRequest(osIndexName);
GetIndexResponse response = client.indices().get(request, RequestOptions.DEFAULT);

MappingMetadata mapping = response.getMappings().get(indexName);
Settings settings = response.getSettings().get(indexName);
MappingMetadata mapping = response.getMappings().get(osIndexName);
Settings settings = response.getSettings().get(osIndexName);
return new FlintMetadata(mapping.source().string(), settings.toString());
} catch (Exception e) {
throw new IllegalStateException("Failed to get Flint index metadata for " + indexName, e);
throw new IllegalStateException("Failed to get Flint index metadata for " + osIndexName, e);
}
}

@Override public void deleteIndex(String indexName) {
String osIndexName = toLowercase(indexName);
try (RestHighLevelClient client = createClient()) {
DeleteIndexRequest request = new DeleteIndexRequest(indexName);
DeleteIndexRequest request = new DeleteIndexRequest(osIndexName);

client.indices().delete(request, RequestOptions.DEFAULT);
} catch (Exception e) {
throw new IllegalStateException("Failed to delete Flint index " + indexName, e);
throw new IllegalStateException("Failed to delete Flint index " + osIndexName, e);
}
}

Expand All @@ -144,7 +149,7 @@ public FlintOpenSearchClient(FlintOptions options) {
queryBuilder = AbstractQueryBuilder.parseInnerQueryBuilder(parser);
}
return new OpenSearchScrollReader(createClient(),
indexName,
toLowercase(indexName),
new SearchSourceBuilder().query(queryBuilder),
options);
} catch (IOException e) {
Expand All @@ -153,7 +158,7 @@ public FlintOpenSearchClient(FlintOptions options) {
}

public FlintWriter createWriter(String indexName) {
return new OpenSearchWriter(createClient(), indexName, options.getRefreshPolicy());
return new OpenSearchWriter(createClient(), toLowercase(indexName), options.getRefreshPolicy());
}

private RestHighLevelClient createClient() {
Expand Down Expand Up @@ -194,4 +199,14 @@ private RestHighLevelClient createClient() {
}
return new RestHighLevelClient(restClientBuilder);
}

/*
* Because OpenSearch requires all lowercase letters in index name, we have to
* lowercase all letters in the given Flint index name.
*/
private String toLowercase(String indexName) {
Objects.requireNonNull(indexName);

return indexName.toLowerCase(Locale.ROOT);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark

import scala.Option.empty

import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.getFlintIndexName
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName
import org.scalatest.matchers.must.Matchers.have
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper

import org.apache.spark.sql.Row

class FlintSparkIndexNameITSuite extends FlintSparkSuite {

/** Test table that has table name and column name with uppercase letter */
private val testTable = "spark_catalog.default.Test"

override def beforeAll(): Unit = {
super.beforeAll()

sql(s"""
| CREATE TABLE $testTable
| (
| Name STRING
| )
| USING CSV
| OPTIONS (
| header 'false',
| delimiter '\t'
| )
|""".stripMargin)

sql(s"""
| INSERT INTO $testTable
| VALUES ('Hello')
| """.stripMargin)
}

test("skipping index with table and column name with uppercase letter") {
sql(s"""
| CREATE SKIPPING INDEX ON $testTable
| ( Name VALUE_SET)
|""".stripMargin)

checkAnswer(
sql(s"DESC SKIPPING INDEX ON $testTable"),
Seq(Row("Name", "string", "VALUE_SET")))

sql(s"REFRESH SKIPPING INDEX ON $testTable")
val flintIndexName = getSkippingIndexName(testTable)
val indexData = flint.queryIndex(flintIndexName).collect().toSet
indexData should have size 1

sql(s"DROP SKIPPING INDEX ON $testTable")
flint.describeIndex(flintIndexName) shouldBe empty
}

test("covering index with index, table and column name with uppercase letter") {
val testIndex = "Idx_Name"
sql(s"""
| CREATE INDEX $testIndex ON $testTable (Name)
|""".stripMargin)

checkAnswer(sql(s"SHOW INDEX ON $testTable"), Seq(Row(testIndex)))
checkAnswer(
sql(s"DESC INDEX $testIndex ON $testTable"),
Seq(Row("Name", "string", "indexed")))

sql(s"REFRESH INDEX $testIndex ON $testTable")
val flintIndexName = getFlintIndexName(testIndex, testTable)
val indexData = flint.queryIndex(flintIndexName).collect().toSet
indexData should have size 1

sql(s"DROP INDEX $testIndex ON $testTable")
flint.describeIndex(flintIndexName) shouldBe empty
}
}

0 comments on commit 0933f91

Please sign in to comment.