From d3f72e60c8a59692c003d92a3db37bd991f32907 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Wed, 4 Oct 2023 15:30:59 -0700 Subject: [PATCH] Fix index name with uppercase letter issue (#60) * Lowercase index name in Flint client Signed-off-by: Chen Dai * Update doc Signed-off-by: Chen Dai * Add IT for Flint client Signed-off-by: Chen Dai --------- Signed-off-by: Chen Dai --- docs/index.md | 40 ++++++++- .../core/storage/FlintOpenSearchClient.java | 47 +++++++---- .../core/FlintOpenSearchClientSuite.scala | 24 ++++++ .../spark/FlintSparkIndexNameITSuite.scala | 81 +++++++++++++++++++ 4 files changed, 172 insertions(+), 20 deletions(-) create mode 100644 integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexNameITSuite.scala diff --git a/docs/index.md b/docs/index.md index cd52051a1..44b0052b0 100644 --- a/docs/index.md +++ b/docs/index.md @@ -20,9 +20,9 @@ Please see the following example in which Index Building Logic and Query Rewrite | Skipping Index | Create Index Statement | Index Building Logic | Query Rewrite Logic | |----------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| Partition | CREATE SKIPPING INDEX
ON alb_logs
FOR COLUMNS (
  year PARTITION,
  month PARTITION,
  day PARTITION,
  hour PARTITION
) | INSERT INTO flint_alb_logs_skipping_index
SELECT
  FIRST(year) AS year,
  FIRST(month) AS month,
  FIRST(day) AS day,
  FIRST(hour) AS hour,
  input_file_name() AS file_path
FROM alb_logs
GROUP BY
  input_file_name() | SELECT *
FROM alb_logs
WHERE year = 2023 AND month = 4
=>
SELECT *
FROM alb_logs (input_files =
  SELECT file_path
  FROM flint_alb_logs_skipping_index
  WHERE year = 2023 AND month = 4
)
WHERE year = 2023 AND month = 4 | -| ValueSet | CREATE SKIPPING INDEX
ON alb_logs
FOR COLUMNS (
  elb_status_code VALUE_SET
) | INSERT INTO flint_alb_logs_skipping_index
SELECT
  COLLECT_SET(elb_status_code) AS elb_status_code,
  input_file_name() AS file_path
FROM alb_logs
GROUP BY
  input_file_name() | SELECT *
FROM alb_logs
WHERE elb_status_code = 404
=>
SELECT *
FROM alb_logs (input_files =
  SELECT file_path
  FROM flint_alb_logs_skipping_index
  WHERE ARRAY_CONTAINS(elb_status_code, 404)
)
WHERE elb_status_code = 404 | -| MinMax | CREATE SKIPPING INDEX
ON alb_logs
FOR COLUMNS (
  request_processing_time MIN_MAX
) | INSERT INTO flint_alb_logs_skipping_index
SELECT
  MIN(request_processing_time) AS request_processing_time_min,
  MAX(request_processing_time) AS request_processing_time_max,
  input_file_name() AS file_path
FROM alb_logs
GROUP BY
  input_file_name() | SELECT *
FROM alb_logs
WHERE request_processing_time = 100
=>
SELECT *
FROM alb_logs (input_files =
SELECT file_path
  FROM flint_alb_logs_skipping_index
  WHERE request_processing_time_min <= 100
    AND 100 <= request_processing_time_max
)
WHERE request_processing_time = 100 +| Partition | CREATE SKIPPING INDEX
ON alb_logs
(
  year PARTITION,
  month PARTITION,
  day PARTITION,
  hour PARTITION
) | INSERT INTO flint_alb_logs_skipping_index
SELECT
  FIRST(year) AS year,
  FIRST(month) AS month,
  FIRST(day) AS day,
  FIRST(hour) AS hour,
  input_file_name() AS file_path
FROM alb_logs
GROUP BY
  input_file_name() | SELECT *
FROM alb_logs
WHERE year = 2023 AND month = 4
=>
SELECT *
FROM alb_logs (input_files =
  SELECT file_path
  FROM flint_alb_logs_skipping_index
  WHERE year = 2023 AND month = 4
)
WHERE year = 2023 AND month = 4 | +| ValueSet | CREATE SKIPPING INDEX
ON alb_logs
(
  elb_status_code VALUE_SET
) | INSERT INTO flint_alb_logs_skipping_index
SELECT
  COLLECT_SET(elb_status_code) AS elb_status_code,
  input_file_name() AS file_path
FROM alb_logs
GROUP BY
  input_file_name() | SELECT *
FROM alb_logs
WHERE elb_status_code = 404
=>
SELECT *
FROM alb_logs (input_files =
  SELECT file_path
  FROM flint_alb_logs_skipping_index
  WHERE ARRAY_CONTAINS(elb_status_code, 404)
)
WHERE elb_status_code = 404 | +| MinMax | CREATE SKIPPING INDEX
ON alb_logs
(
  request_processing_time MIN_MAX
) | INSERT INTO flint_alb_logs_skipping_index
SELECT
  MIN(request_processing_time) AS request_processing_time_min,
  MAX(request_processing_time) AS request_processing_time_max,
  input_file_name() AS file_path
FROM alb_logs
GROUP BY
  input_file_name() | SELECT *
FROM alb_logs
WHERE request_processing_time = 100
=>
SELECT *
FROM alb_logs (input_files =
SELECT file_path
  FROM flint_alb_logs_skipping_index
  WHERE request_processing_time_min <= 100
    AND 100 <= request_processing_time_max
)
WHERE request_processing_time = 100 ### Flint Index Specification @@ -223,7 +223,23 @@ WITH ( ### OpenSearch -OpenSearch stores the Flint index in an OpenSearch index of the given name. +OpenSearch index corresponding to the Flint index follows the naming convention below: + +1. Skipping index: `flint_[catalog_database_table]_skipping_index` +2. Covering index: `flint_[catalog_database_table]_[index_name]_index` + +It's important to note that any uppercase letters in the index name and table name (catalog, database and table) in SQL statement will be automatically converted to lowercase due to restriction imposed by OpenSearch. + +Examples: + +```sql +-- OpenSearch index name is `flint_spark_catalog_default_alb_logs_skipping_index` +CREATE SKIPPING INDEX ON spark_catalog.default.alb_logs ... + +-- OpenSearch index name is `flint_spark_catalog_default_alb_logs_elb_and_requesturi_index` +CREATE INDEX elb_and_requestUri ON spark_catalog.default.alb_logs ... +``` + In the index mapping, the `_meta` and `properties`field stores meta and schema info of a Flint index. ```json @@ -390,6 +406,22 @@ TODO ## Limitations +### Flint Index Naming + +Due to the conversion of uppercase letters to lowercase in OpenSearch index names, it is not permissible to create a Flint index with a table name or index name that differs solely by case. + +For instance, only one of the statement per group can be successfully: + +```sql +-- myGlue vs myglue +CREATE SKIPPING INDEX ON myGlue.default.alb_logs ... +CREATE SKIPPING INDEX ON myglue.default.alb_logs ... + +-- idx_elb vs Idx_elb +CREATE INDEX idx_elb ON alb_logs ... +CREATE INDEX Idx_elb ON alb_logs ... +``` + ### Query Optimization For now, only single or conjunct conditions (conditions connected by AND) in WHERE clause can be optimized by skipping index. diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java index b35406707..b973385d8 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java @@ -15,6 +15,8 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Locale; +import java.util.Objects; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import org.apache.http.HttpHost; @@ -30,8 +32,6 @@ import org.opensearch.client.indices.CreateIndexRequest; import org.opensearch.client.indices.GetIndexRequest; import org.opensearch.client.indices.GetIndexResponse; -import org.opensearch.client.indices.GetMappingsRequest; -import org.opensearch.client.indices.GetMappingsResponse; import org.opensearch.cluster.metadata.MappingMetadata; import org.opensearch.common.Strings; import org.opensearch.common.settings.Settings; @@ -68,8 +68,9 @@ public FlintOpenSearchClient(FlintOptions options) { } @Override public void createIndex(String indexName, FlintMetadata metadata) { + String osIndexName = toLowercase(indexName); try (RestHighLevelClient client = createClient()) { - CreateIndexRequest request = new CreateIndexRequest(indexName); + CreateIndexRequest request = new CreateIndexRequest(osIndexName); request.mapping(metadata.getContent(), XContentType.JSON); if (metadata.getIndexSettings() != null) { @@ -77,21 +78,23 @@ public FlintOpenSearchClient(FlintOptions options) { } client.indices().create(request, RequestOptions.DEFAULT); } catch (Exception e) { - throw new IllegalStateException("Failed to create Flint index " + indexName, e); + throw new IllegalStateException("Failed to create Flint index " + osIndexName, e); } } @Override public boolean exists(String indexName) { + String osIndexName = toLowercase(indexName); try (RestHighLevelClient client = createClient()) { - return client.indices().exists(new GetIndexRequest(indexName), RequestOptions.DEFAULT); + return client.indices().exists(new GetIndexRequest(osIndexName), RequestOptions.DEFAULT); } catch (IOException e) { - throw new IllegalStateException("Failed to check if Flint index exists " + indexName, e); + throw new IllegalStateException("Failed to check if Flint index exists " + osIndexName, e); } } @Override public List getAllIndexMetadata(String indexNamePattern) { + String osIndexNamePattern = toLowercase(indexNamePattern); try (RestHighLevelClient client = createClient()) { - GetIndexRequest request = new GetIndexRequest(indexNamePattern); + GetIndexRequest request = new GetIndexRequest(osIndexNamePattern); GetIndexResponse response = client.indices().get(request, RequestOptions.DEFAULT); return Arrays.stream(response.getIndices()) @@ -100,30 +103,32 @@ public FlintOpenSearchClient(FlintOptions options) { response.getSettings().get(index).toString())) .collect(Collectors.toList()); } catch (Exception e) { - throw new IllegalStateException("Failed to get Flint index metadata for " + indexNamePattern, e); + throw new IllegalStateException("Failed to get Flint index metadata for " + osIndexNamePattern, e); } } @Override public FlintMetadata getIndexMetadata(String indexName) { + String osIndexName = toLowercase(indexName); try (RestHighLevelClient client = createClient()) { - GetIndexRequest request = new GetIndexRequest(indexName); + GetIndexRequest request = new GetIndexRequest(osIndexName); GetIndexResponse response = client.indices().get(request, RequestOptions.DEFAULT); - MappingMetadata mapping = response.getMappings().get(indexName); - Settings settings = response.getSettings().get(indexName); + MappingMetadata mapping = response.getMappings().get(osIndexName); + Settings settings = response.getSettings().get(osIndexName); return new FlintMetadata(mapping.source().string(), settings.toString()); } catch (Exception e) { - throw new IllegalStateException("Failed to get Flint index metadata for " + indexName, e); + throw new IllegalStateException("Failed to get Flint index metadata for " + osIndexName, e); } } @Override public void deleteIndex(String indexName) { + String osIndexName = toLowercase(indexName); try (RestHighLevelClient client = createClient()) { - DeleteIndexRequest request = new DeleteIndexRequest(indexName); + DeleteIndexRequest request = new DeleteIndexRequest(osIndexName); client.indices().delete(request, RequestOptions.DEFAULT); } catch (Exception e) { - throw new IllegalStateException("Failed to delete Flint index " + indexName, e); + throw new IllegalStateException("Failed to delete Flint index " + osIndexName, e); } } @@ -144,7 +149,7 @@ public FlintOpenSearchClient(FlintOptions options) { queryBuilder = AbstractQueryBuilder.parseInnerQueryBuilder(parser); } return new OpenSearchScrollReader(createClient(), - indexName, + toLowercase(indexName), new SearchSourceBuilder().query(queryBuilder), options); } catch (IOException e) { @@ -153,7 +158,7 @@ public FlintOpenSearchClient(FlintOptions options) { } public FlintWriter createWriter(String indexName) { - return new OpenSearchWriter(createClient(), indexName, options.getRefreshPolicy()); + return new OpenSearchWriter(createClient(), toLowercase(indexName), options.getRefreshPolicy()); } private RestHighLevelClient createClient() { @@ -194,4 +199,14 @@ private RestHighLevelClient createClient() { } return new RestHighLevelClient(restClientBuilder); } + + /* + * Because OpenSearch requires all lowercase letters in index name, we have to + * lowercase all letters in the given Flint index name. + */ + private String toLowercase(String indexName) { + Objects.requireNonNull(indexName); + + return indexName.toLowerCase(Locale.ROOT); + } } diff --git a/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala index d2c7f3969..9d34b6f2a 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala @@ -73,6 +73,30 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M allMetadata.forEach(metadata => metadata.getIndexSettings should not be empty) } + it should "convert index name to all lowercase" in { + val indexName = "flint_ELB_logs_index" + flintClient.createIndex( + indexName, + new FlintMetadata("""{"properties": {"test": { "type": "integer" } } }""")) + + flintClient.exists(indexName) shouldBe true + flintClient.getIndexMetadata(indexName) should not be null + flintClient.getAllIndexMetadata("flint_ELB_*") should not be empty + + // Read write test + val writer = flintClient.createWriter(indexName) + writer.write("""{"create":{}}""") + writer.write("\n") + writer.write("""{"test":1}""") + writer.write("\n") + writer.flush() + writer.close() + flintClient.createReader(indexName, "").hasNext shouldBe true + + flintClient.deleteIndex(indexName) + flintClient.exists(indexName) shouldBe false + } + it should "return false if index not exist" in { flintClient.exists("non-exist-index") shouldBe false } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexNameITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexNameITSuite.scala new file mode 100644 index 000000000..5b47edc46 --- /dev/null +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexNameITSuite.scala @@ -0,0 +1,81 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark + +import scala.Option.empty + +import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.getFlintIndexName +import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName +import org.scalatest.matchers.must.Matchers.have +import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper + +import org.apache.spark.sql.Row + +class FlintSparkIndexNameITSuite extends FlintSparkSuite { + + /** Test table that has table name and column name with uppercase letter */ + private val testTable = "spark_catalog.default.Test" + + override def beforeAll(): Unit = { + super.beforeAll() + + sql(s""" + | CREATE TABLE $testTable + | ( + | Name STRING + | ) + | USING CSV + | OPTIONS ( + | header 'false', + | delimiter '\t' + | ) + |""".stripMargin) + + sql(s""" + | INSERT INTO $testTable + | VALUES ('Hello') + | """.stripMargin) + } + + test("skipping index with table and column name with uppercase letter") { + sql(s""" + | CREATE SKIPPING INDEX ON $testTable + | ( Name VALUE_SET) + |""".stripMargin) + + checkAnswer( + sql(s"DESC SKIPPING INDEX ON $testTable"), + Seq(Row("Name", "string", "VALUE_SET"))) + + sql(s"REFRESH SKIPPING INDEX ON $testTable") + val flintIndexName = getSkippingIndexName(testTable) + val indexData = flint.queryIndex(flintIndexName).collect().toSet + indexData should have size 1 + + sql(s"DROP SKIPPING INDEX ON $testTable") + flint.describeIndex(flintIndexName) shouldBe empty + } + + test("covering index with index, table and column name with uppercase letter") { + val testIndex = "Idx_Name" + sql(s""" + | CREATE INDEX $testIndex ON $testTable (Name) + |""".stripMargin) + + checkAnswer(sql(s"SHOW INDEX ON $testTable"), Seq(Row(testIndex))) + checkAnswer( + sql(s"DESC INDEX $testIndex ON $testTable"), + Seq(Row("Name", "string", "indexed"))) + + sql(s"REFRESH INDEX $testIndex ON $testTable") + val flintIndexName = getFlintIndexName(testIndex, testTable) + val indexData = flint.queryIndex(flintIndexName).collect().toSet + indexData should have size 1 + + sql(s"DROP INDEX $testIndex ON $testTable") + flint.describeIndex(flintIndexName) shouldBe empty + } +}