diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java index 59395137d..be8a4728b 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java @@ -18,7 +18,9 @@ import java.util.Arrays; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -39,6 +41,7 @@ import org.opensearch.client.indices.CreateIndexResponse; import org.opensearch.client.indices.GetIndexRequest; import org.opensearch.client.indices.GetIndexResponse; +import org.opensearch.cluster.metadata.AliasMetadata; import org.opensearch.cluster.metadata.MappingMetadata; import org.opensearch.common.Strings; import org.opensearch.common.settings.Settings; @@ -85,19 +88,18 @@ public void alias(String indexName, String aliasName, FlintMetadata metadata) { if (!exists) { // create index including the alias name with is the flint convention name createIndex(osIndexName, metadata); - } else { - // Adding the alias to the existing index - IndicesAliasesRequest aliasesRequest = new IndicesAliasesRequest(); - IndicesAliasesRequest.AliasActions aliasAction = - new IndicesAliasesRequest.AliasActions(IndicesAliasesRequest.AliasActions.Type.ADD) - .index(osIndexName) - .alias(osAliasName); - aliasesRequest.addAliasAction(aliasAction); - // Executing the updateAliases request - AcknowledgedResponse response = client.indices().updateAliases(aliasesRequest, RequestOptions.DEFAULT); - if (!response.isAcknowledged()) { - throw new IllegalStateException(String.format("Failed to acknowledge Alias %s for index %s", aliasName, indexName)); - } + } + // Adding the alias to the existing index + IndicesAliasesRequest aliasesRequest = new IndicesAliasesRequest(); + IndicesAliasesRequest.AliasActions aliasAction = + new IndicesAliasesRequest.AliasActions(IndicesAliasesRequest.AliasActions.Type.ADD) + .index(osIndexName) + .alias(osAliasName); + aliasesRequest.addAliasAction(aliasAction); + // Executing the updateAliases request + AcknowledgedResponse response = client.indices().updateAliases(aliasesRequest, RequestOptions.DEFAULT); + if (!response.isAcknowledged()) { + throw new IllegalStateException(String.format("Failed to acknowledge Alias %s for index %s", aliasName, indexName)); } } catch ( Exception e) { @@ -110,17 +112,16 @@ public void createIndex(String indexName, FlintMetadata metadata) { String osIndexName = toLowercase(indexName); try (RestHighLevelClient client = createClient()) { CreateIndexRequest request = new CreateIndexRequest(osIndexName); - boolean includeMappingProperties = true; - if(metadata.targetName().nonEmpty()) { - request.alias(new Alias(toLowercase(metadata.name()))); - includeMappingProperties = false; - } + boolean includeMappingProperties = !metadata.targetName().nonEmpty(); request.mapping(metadata.getContent(includeMappingProperties), XContentType.JSON); Option settings = metadata.indexSettings(); if (settings.isDefined()) { request.settings(settings.get(), XContentType.JSON); } - client.indices().create(request, RequestOptions.DEFAULT); + CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT); + if (!response.isAcknowledged()) { + throw new IllegalStateException(String.format("Failed to acknowledge create index %s", indexName)); + } } catch (Exception e) { throw new IllegalStateException("Failed to create Flint index " + osIndexName, e); } @@ -159,10 +160,24 @@ public FlintMetadata getIndexMetadata(String indexName) { try (RestHighLevelClient client = createClient()) { GetIndexRequest request = new GetIndexRequest(osIndexName); GetIndexResponse response = client.indices().get(request, RequestOptions.DEFAULT); + if (response.getMappings().containsKey(osIndexName)) { + MappingMetadata mapping = response.getMappings().get(osIndexName); + Settings settings = response.getSettings().get(osIndexName); + return FlintMetadata.apply(mapping.source().string(), settings.toString()); + } + if (!response.getAliases().isEmpty()) { + Optional aliasAncestor = response.getAliases().entrySet().stream() + .filter(entry -> entry.getValue().stream().anyMatch(alias -> alias.alias().equals(indexName))) + .map(Map.Entry::getKey) + .findFirst(); - MappingMetadata mapping = response.getMappings().get(osIndexName); - Settings settings = response.getSettings().get(osIndexName); - return FlintMetadata.apply(mapping.source().string(), settings.toString()); + if(aliasAncestor.isPresent()) { + MappingMetadata mapping = response.getMappings().get(aliasAncestor.get()); + Settings settings = response.getSettings().get(aliasAncestor.get()); + return FlintMetadata.apply(mapping.source().string(), settings.toString()); + } + } + throw new IllegalStateException("Failed to get Flint index metadata for " + osIndexName); } catch (Exception e) { throw new IllegalStateException("Failed to get Flint index metadata for " + osIndexName, e); } @@ -173,8 +188,10 @@ public void deleteIndex(String indexName) { String osIndexName = toLowercase(indexName); try (RestHighLevelClient client = createClient()) { DeleteIndexRequest request = new DeleteIndexRequest(osIndexName); - - client.indices().delete(request, RequestOptions.DEFAULT); + AcknowledgedResponse response = client.indices().delete(request, RequestOptions.DEFAULT); + if (!response.isAcknowledged()) { + throw new IllegalStateException(String.format("Failed to acknowledge delete index %s", indexName)); + } } catch (Exception e) { throw new IllegalStateException("Failed to delete Flint index " + osIndexName, e); } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala index 627e11f52..90b561398 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala @@ -25,6 +25,7 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { /** Test table and index name */ private val testTable = "spark_catalog.default.covering_sql_test" private val testIndex = "name_and_age" + private val targetIndex = "target_index" private val testFlintIndex = getFlintIndexName(testIndex, testTable) override def beforeAll(): Unit = { @@ -32,7 +33,7 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { createPartitionedTable(testTable) } - + override def afterEach(): Unit = { super.afterEach() @@ -93,7 +94,7 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { (settings \ "index.number_of_shards").extract[String] shouldBe "2" (settings \ "index.number_of_replicas").extract[String] shouldBe "3" } - + test("create covering index with invalid option") { the[IllegalArgumentException] thrownBy sql(s""" @@ -232,4 +233,27 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { flint.describeIndex(testFlintIndex) shouldBe empty } + + test("use existing index as the covering index") { + sql( + s""" + | CREATE INDEX $testIndex ON $testTable USING $targetIndex ( name ) + | WITH ( + | index_settings = '{"number_of_shards": 2, "number_of_replicas": 3}' + | ) + |""".stripMargin) + + // Check if the index setting option is set to OS index setting + val flintClient = new FlintOpenSearchClient(new FlintOptions(openSearchOptions.asJava)) + + implicit val formats: Formats = Serialization.formats(NoTypeHints) + var settings = parse(flintClient.getIndexMetadata(targetIndex).indexSettings.get) + (settings \ "index.number_of_shards").extract[String] shouldBe "2" + (settings \ "index.number_of_replicas").extract[String] shouldBe "3" + //validate the index alias is working + settings = parse(flintClient.getIndexMetadata(testFlintIndex).indexSettings.get) + (settings \ "index.number_of_shards").extract[String] shouldBe "2" + (settings \ "index.number_of_replicas").extract[String] shouldBe "3" + } + } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala index e7fb7a1f6..27672bb7e 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala @@ -88,54 +88,6 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { |""".stripMargin) } - test("create materialized view using existing OpenSearch index successfully") { - val indexOptions = - FlintSparkIndexOptions(Map("auto_refresh" -> "true", "checkpoint_location" -> "s3://test/")) - flint - .materializedView() - .targetName(testTargetIndex) - .name(testMvName) - .query(testQuery) - .options(indexOptions) - .create() - - val index = flint.describeIndex("existing_index") - index shouldBe defined - index.get.metadata().getContent() should matchJson(s""" - | { - | "_meta": { - | "version": "${current()}", - | "name": "spark_catalog.default.mv_test_metrics", - | "kind": "mv", - | "targetName": "$testTargetIndex", - | "source": "$testQuery", - | "indexedColumns": [ - | { - | "columnName": "startTime", - | "columnType": "timestamp" - | },{ - | "columnName": "count", - | "columnType": "long" - | }], - | "options": { - | "auto_refresh": "true", - | "checkpoint_location": "s3://test/" - | }, - | "properties": {} - | }, - | "properties": { - | "startTime": { - | "type": "date", - | "format": "strict_date_optional_time_nanos" - | }, - | "count": { - | "type": "long" - | } - | } - | } - |""".stripMargin) - } - // TODO: fix this windowing function unable to be used in GROUP BY ignore("full refresh materialized view") { flint @@ -233,6 +185,55 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { } } + test("use existing existing OpenSearch index for materialized view successfully") { + val indexOptions = + FlintSparkIndexOptions(Map("auto_refresh" -> "true", "checkpoint_location" -> "s3://test/")) + flint + .materializedView() + .targetName(testTargetIndex) + .name(testMvName) + .query(testQuery) + .options(indexOptions) + .create() + + val index = flint.describeIndex("existing_index") + index shouldBe defined + index.get.metadata().getContent() should matchJson( + s""" + | { + | "_meta": { + | "version": "${current()}", + | "name": "spark_catalog.default.mv_test_metrics", + | "kind": "mv", + | "targetName": "$testTargetIndex", + | "source": "$testQuery", + | "indexedColumns": [ + | { + | "columnName": "startTime", + | "columnType": "timestamp" + | },{ + | "columnName": "count", + | "columnType": "long" + | }], + | "options": { + | "auto_refresh": "true", + | "checkpoint_location": "s3://test/" + | }, + | "properties": {} + | }, + | "properties": { + | "startTime": { + | "type": "date", + | "format": "strict_date_optional_time_nanos" + | }, + | "count": { + | "type": "long" + | } + | } + | } + |""".stripMargin) + } + private def timestamp(ts: String): Timestamp = Timestamp.valueOf(ts) private def withIncrementalMaterializedView(query: String)(