From 5720e54d307e39a341fd15b8d821607d239616e3 Mon Sep 17 00:00:00 2001 From: Sean Kao Date: Mon, 9 Dec 2024 12:28:11 -0800 Subject: [PATCH 1/3] Add retryable AOSS HTTP response (#951) * add retryable response message Signed-off-by: Sean Kao * check retryable response only if 400 Signed-off-by: Sean Kao * add handler for aoss only Signed-off-by: Sean Kao * edit comment Signed-off-by: Sean Kao * bugfix: aoss result predicate not used Signed-off-by: Sean Kao --------- Signed-off-by: Sean Kao --- .../flint/core/http/FlintRetryOptions.java | 18 ++++- .../http/handler/HttpAOSSResultPredicate.java | 66 +++++++++++++++++++ .../http/RetryableHttpAsyncClientSuite.scala | 30 +++++++++ 3 files changed, 111 insertions(+), 3 deletions(-) create mode 100644 flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpAOSSResultPredicate.java diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/http/FlintRetryOptions.java b/flint-core/src/main/scala/org/opensearch/flint/core/http/FlintRetryOptions.java index 8f6e2c07e..597f441ec 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/http/FlintRetryOptions.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/http/FlintRetryOptions.java @@ -6,8 +6,12 @@ package org.opensearch.flint.core.http; import static java.time.temporal.ChronoUnit.SECONDS; +import static org.opensearch.flint.core.FlintOptions.SERVICE_NAME; +import static org.opensearch.flint.core.FlintOptions.SERVICE_NAME_AOSS; +import static org.opensearch.flint.core.FlintOptions.SERVICE_NAME_ES; import dev.failsafe.RetryPolicy; +import dev.failsafe.RetryPolicyBuilder; import dev.failsafe.event.ExecutionAttemptedEvent; import dev.failsafe.function.CheckedPredicate; import java.time.Duration; @@ -16,6 +20,7 @@ import java.util.logging.Logger; import org.opensearch.action.bulk.BulkResponse; import org.opensearch.flint.core.http.handler.ExceptionClassNameFailurePredicate; +import org.opensearch.flint.core.http.handler.HttpAOSSResultPredicate; import org.opensearch.flint.core.http.handler.HttpStatusCodeResultPredicate; import java.io.Serializable; @@ -65,7 +70,7 @@ public boolean isRetryEnabled() { * @return Failsafe retry policy */ public RetryPolicy getRetryPolicy() { - return RetryPolicy.builder() + RetryPolicyBuilder builder = RetryPolicy.builder() // Backoff strategy config (can be configurable as needed in future) .withBackoff(1, 30, SECONDS) .withJitter(Duration.ofMillis(100)) @@ -75,8 +80,11 @@ public RetryPolicy getRetryPolicy() { .handleResultIf(new HttpStatusCodeResultPredicate<>(getRetryableHttpStatusCodes())) // Logging listener .onFailedAttempt(FlintRetryOptions::onFailure) - .onRetry(FlintRetryOptions::onRetry) - .build(); + .onRetry(FlintRetryOptions::onRetry); + if (SERVICE_NAME_AOSS.equals(getServiceName())) { + builder.handleResultIf(new HttpAOSSResultPredicate<>()); + } + return builder.build(); } public RetryPolicy getBulkRetryPolicy(CheckedPredicate resultPredicate) { @@ -101,6 +109,10 @@ private static void onRetry(ExecutionAttemptedEvent event) { LOG.warning("Retrying failed request at #" + event.getAttemptCount()); } + private String getServiceName() { + return options.getOrDefault(SERVICE_NAME, SERVICE_NAME_ES); + } + /** * @return maximum retry option value */ diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpAOSSResultPredicate.java b/flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpAOSSResultPredicate.java new file mode 100644 index 000000000..8bfb05fa3 --- /dev/null +++ b/flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpAOSSResultPredicate.java @@ -0,0 +1,66 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.http.handler; + +import dev.failsafe.function.CheckedPredicate; +import org.apache.http.HttpEntity; +import org.apache.http.HttpResponse; +import org.apache.http.entity.BufferedHttpEntity; +import org.apache.http.util.EntityUtils; + +import java.util.logging.Logger; + +/** + * Failure handler based on HTTP response from AOSS. + * + * @param result type (supposed to be HttpResponse for OS client) + */ +public class HttpAOSSResultPredicate implements CheckedPredicate { + + private static final Logger LOG = Logger.getLogger(HttpAOSSResultPredicate.class.getName()); + + public static final int BAD_REQUEST_STATUS_CODE = 400; + public static final String RESOURCE_ALREADY_EXISTS_EXCEPTION_MESSAGE = "resource_already_exists_exception"; + + public HttpAOSSResultPredicate() { } + + @Override + public boolean test(T result) throws Throwable { + LOG.info("Checking if response is retryable"); + + int statusCode = ((HttpResponse) result).getStatusLine().getStatusCode(); + if (statusCode != BAD_REQUEST_STATUS_CODE) { + LOG.info("Status code " + statusCode + " is not " + BAD_REQUEST_STATUS_CODE + ". Check result: false"); + return false; + } + + HttpResponse response = (HttpResponse) result; + HttpEntity entity = response.getEntity(); + if (entity == null) { + LOG.info("No response entity found. Check result: false"); + return false; + } + + // Buffer the entity to make it repeatable, so that this retry test does not consume the content stream, + // resulting in the request caller getting empty response + BufferedHttpEntity bufferedEntity = new BufferedHttpEntity(entity); + response.setEntity(bufferedEntity); + + try { + String responseContent = EntityUtils.toString(bufferedEntity); + // Effectively restores the content stream of the response + bufferedEntity.getContent().reset(); + + boolean isRetryable = responseContent.contains(RESOURCE_ALREADY_EXISTS_EXCEPTION_MESSAGE); + + LOG.info("Check retryable response result: " + isRetryable); + return isRetryable; + } catch (Exception e) { + LOG.info("Unable to parse response body. Check result: false"); + return false; + } + } +} diff --git a/flint-core/src/test/scala/org/opensearch/flint/core/http/RetryableHttpAsyncClientSuite.scala b/flint-core/src/test/scala/org/opensearch/flint/core/http/RetryableHttpAsyncClientSuite.scala index 7d3b79a9e..8a8927920 100644 --- a/flint-core/src/test/scala/org/opensearch/flint/core/http/RetryableHttpAsyncClientSuite.scala +++ b/flint-core/src/test/scala/org/opensearch/flint/core/http/RetryableHttpAsyncClientSuite.scala @@ -12,11 +12,13 @@ import java.util.concurrent.{ExecutionException, Future} import scala.collection.JavaConverters.mapAsJavaMapConverter +import org.apache.http.HttpEntity import org.apache.http.HttpResponse import org.apache.http.concurrent.FutureCallback import org.apache.http.impl.nio.client.{CloseableHttpAsyncClient, HttpAsyncClientBuilder} import org.apache.http.nio.protocol.{HttpAsyncRequestProducer, HttpAsyncResponseConsumer} import org.apache.http.protocol.HttpContext +import org.apache.http.util.EntityUtils import org.mockito.ArgumentMatchers.any import org.mockito.Mockito._ import org.mockito.verification.VerificationMode @@ -153,6 +155,23 @@ class RetryableHttpAsyncClientSuite extends AnyFlatSpec with BeforeAndAfter with expectFutureGetTimes = times(0)) } + it should "retry if AOSS response is retryable" in { + retryableClient + .withOption("auth.servicename", "aoss") + .whenResponse( + 400, + "OpenSearchStatusException[OpenSearch exception [type=resource_already_exists_exception,") + .shouldExecute(times(DEFAULT_MAX_RETRIES + 1)) + } + + it should "not apply retry policy for AOSS response if service is not AOSS" in { + retryableClient + .whenResponse( + 400, + "OpenSearchStatusException[OpenSearch exception [type=resource_already_exists_exception,") + .shouldExecute(times(1)) + } + private def retryableClient: AssertionHelper = new AssertionHelper class AssertionHelper { @@ -175,6 +194,17 @@ class RetryableHttpAsyncClientSuite extends AnyFlatSpec with BeforeAndAfter with this } + def whenResponse(statusCode: Int, responseMessage: String): AssertionHelper = { + val entity = mock[HttpEntity](RETURNS_DEEP_STUBS) + mockStatic(classOf[EntityUtils]) + when(EntityUtils.toString(any[HttpEntity])).thenReturn(responseMessage) + val response = mock[HttpResponse](RETURNS_DEEP_STUBS) + when(response.getStatusLine.getStatusCode).thenReturn(statusCode) + when(response.getEntity).thenReturn(entity) + when(future.get()).thenReturn(response) + this + } + def shouldExecute(expectExecuteTimes: VerificationMode): Unit = { shouldExecute(expectExecuteTimes, expectExecuteTimes) } From 896fda25104061fbe92948cb932c7a8d31585573 Mon Sep 17 00:00:00 2001 From: Lantao Jin Date: Tue, 10 Dec 2024 14:31:49 +0800 Subject: [PATCH 2/3] Refactor the Alias node to avoid ambiguity (#975) Signed-off-by: Lantao Jin --- .../java/org/opensearch/sql/ast/expression/Alias.java | 10 +++------- .../opensearch/sql/ppl/CatalystExpressionVisitor.java | 2 +- .../sql/ppl/parser/AstExpressionBuilder.java | 3 +-- 3 files changed, 5 insertions(+), 10 deletions(-) diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/expression/Alias.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/expression/Alias.java index 7b3078629..226ff7a8c 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/expression/Alias.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/expression/Alias.java @@ -13,27 +13,23 @@ import org.opensearch.sql.ast.AbstractNodeVisitor; /** - * Alias abstraction that associate an unnamed expression with a name and an optional alias. The - * name and alias information preserved is useful for semantic analysis and response formatting + * Alias abstraction that associate an unnamed expression with a name. + * The name information preserved is useful for semantic analysis and response formatting * eventually. This can avoid restoring the info in toString() method which is inaccurate because * original info is already lost. */ -@AllArgsConstructor @EqualsAndHashCode(callSuper = false) @Getter @RequiredArgsConstructor @ToString public class Alias extends UnresolvedExpression { - /** Original field name. */ + /** The name to be associated with the result of computing delegated expression. */ private final String name; /** Expression aliased. */ private final UnresolvedExpression delegated; - /** Optional field alias. */ - private String alias; - @Override public T accept(AbstractNodeVisitor nodeVisitor, C context) { return nodeVisitor.visitAlias(this, context); diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystExpressionVisitor.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystExpressionVisitor.java index 35ac7ed47..bc14ba9d4 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystExpressionVisitor.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystExpressionVisitor.java @@ -257,7 +257,7 @@ public Expression visitAlias(Alias node, CatalystPlanContext context) { Expression arg = context.popNamedParseExpressions().get(); return context.getNamedParseExpressions().push( org.apache.spark.sql.catalyst.expressions.Alias$.MODULE$.apply(arg, - node.getAlias() != null ? node.getAlias() : node.getName(), + node.getName(), NamedExpression.newExprId(), seq(new java.util.ArrayList()), Option.empty(), diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java index 0d5078fa8..1fe57d13e 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java @@ -383,8 +383,7 @@ public UnresolvedExpression visitBooleanLiteral(OpenSearchPPLParser.BooleanLiter public UnresolvedExpression visitBySpanClause(OpenSearchPPLParser.BySpanClauseContext ctx) { String name = ctx.spanClause().getText(); return ctx.alias != null - ? new Alias( - name, visit(ctx.spanClause()), StringUtils.unquoteIdentifier(ctx.alias.getText())) + ? new Alias(StringUtils.unquoteIdentifier(ctx.alias.getText()), visit(ctx.spanClause())) : new Alias(name, visit(ctx.spanClause())); } From 5052ffe03264e12010239ba1dd6a9deab597cfd7 Mon Sep 17 00:00:00 2001 From: Sean Kao Date: Thu, 12 Dec 2024 11:28:40 -0800 Subject: [PATCH 3/3] Add sourceQuery in metadata cache (#988) * add sourceQuery in metadata cache Signed-off-by: Sean Kao * preserve index mapping content when updating cache Signed-off-by: Sean Kao * syntax and comment Signed-off-by: Sean Kao * merge index mapping in place Signed-off-by: Sean Kao --------- Signed-off-by: Sean Kao --- .../metadatacache/FlintMetadataCache.scala | 13 +- .../FlintOpenSearchMetadataCacheWriter.scala | 98 ++++----- .../FlintMetadataCacheSuite.scala | 6 +- ...OpenSearchMetadataCacheWriterITSuite.scala | 196 ++++++------------ 4 files changed, 124 insertions(+), 189 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/metadatacache/FlintMetadataCache.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/metadatacache/FlintMetadataCache.scala index 86267c881..c2007c124 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/metadatacache/FlintMetadataCache.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/metadatacache/FlintMetadataCache.scala @@ -23,6 +23,8 @@ case class FlintMetadataCache( refreshInterval: Option[Int], /** Source table names for building the Flint index. */ sourceTables: Array[String], + /** Source query for MV */ + sourceQuery: Option[String], /** Timestamp when Flint index is last refreshed. Unit: milliseconds */ lastRefreshTime: Option[Long]) { @@ -64,6 +66,10 @@ object FlintMetadataCache { case MV_INDEX_TYPE => getSourceTablesFromMetadata(metadata) case _ => Array(metadata.source) } + val sourceQuery = metadata.kind match { + case MV_INDEX_TYPE => Some(metadata.source) + case _ => None + } val lastRefreshTime: Option[Long] = metadata.latestLogEntry.flatMap { entry => entry.lastRefreshCompleteTime match { case FlintMetadataLogEntry.EMPTY_TIMESTAMP => None @@ -71,6 +77,11 @@ object FlintMetadataCache { } } - FlintMetadataCache(metadataCacheVersion, refreshInterval, sourceTables, lastRefreshTime) + FlintMetadataCache( + metadataCacheVersion, + refreshInterval, + sourceTables, + sourceQuery, + lastRefreshTime) } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/metadatacache/FlintOpenSearchMetadataCacheWriter.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/metadatacache/FlintOpenSearchMetadataCacheWriter.scala index f6fc0ba6f..de3e051fb 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/metadatacache/FlintOpenSearchMetadataCacheWriter.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/metadatacache/FlintOpenSearchMetadataCacheWriter.scala @@ -5,18 +5,18 @@ package org.opensearch.flint.spark.metadatacache -import java.util +import java.util.{HashMap, Map => JMap} import scala.collection.JavaConverters._ import org.opensearch.client.RequestOptions +import org.opensearch.client.indices.GetIndexRequest import org.opensearch.client.indices.PutMappingRequest import org.opensearch.common.xcontent.XContentType -import org.opensearch.flint.common.metadata.{FlintIndexMetadataService, FlintMetadata} +import org.opensearch.flint.common.metadata.FlintMetadata import org.opensearch.flint.core.{FlintOptions, IRestHighLevelClient} -import org.opensearch.flint.core.metadata.FlintIndexMetadataServiceBuilder import org.opensearch.flint.core.metadata.FlintJsonHelper._ -import org.opensearch.flint.core.storage.{FlintOpenSearchIndexMetadataService, OpenSearchClientUtils} +import org.opensearch.flint.core.storage.OpenSearchClientUtils import org.apache.spark.internal.Logging @@ -27,27 +27,20 @@ class FlintOpenSearchMetadataCacheWriter(options: FlintOptions) extends FlintMetadataCacheWriter with Logging { - /** - * Since metadata cache shares the index mappings _meta field with OpenSearch index metadata - * storage, this flag is to allow for preserving index metadata that is already stored in _meta - * when updating metadata cache. - */ - private val includeSpec: Boolean = - FlintIndexMetadataServiceBuilder - .build(options) - .isInstanceOf[FlintOpenSearchIndexMetadataService] - override def updateMetadataCache(indexName: String, metadata: FlintMetadata): Unit = { - logInfo(s"Updating metadata cache for $indexName with $metadata"); + logInfo(s"Updating metadata cache for $indexName"); val osIndexName = OpenSearchClientUtils.sanitizeIndexName(indexName) var client: IRestHighLevelClient = null try { client = OpenSearchClientUtils.createClient(options) - val request = new PutMappingRequest(osIndexName) - val serialized = serialize(metadata) - logInfo(s"Serialized: $serialized") - request.source(serialized, XContentType.JSON) - client.updateIndexMapping(request, RequestOptions.DEFAULT) + val indexMapping = getIndexMapping(client, osIndexName) + val metadataCacheProperties = FlintMetadataCache(metadata).toMap.asJava + mergeMetadataCacheProperties(indexMapping, metadataCacheProperties) + val serialized = buildJson(builder => { + builder.field("_meta", indexMapping.get("_meta")) + builder.field("properties", indexMapping.get("properties")) + }) + updateIndexMapping(client, osIndexName, serialized) } catch { case e: Exception => throw new IllegalStateException( @@ -59,50 +52,35 @@ class FlintOpenSearchMetadataCacheWriter(options: FlintOptions) } } - /** - * Serialize FlintMetadataCache from FlintMetadata. Modified from {@link - * FlintOpenSearchIndexMetadataService} - */ - private[metadatacache] def serialize(metadata: FlintMetadata): String = { - try { - buildJson(builder => { - objectField(builder, "_meta") { - // If _meta is used as index metadata storage, preserve them. - if (includeSpec) { - builder - .field("version", metadata.version.version) - .field("name", metadata.name) - .field("kind", metadata.kind) - .field("source", metadata.source) - .field("indexedColumns", metadata.indexedColumns) - - if (metadata.latestId.isDefined) { - builder.field("latestId", metadata.latestId.get) - } - optionalObjectField(builder, "options", metadata.options) - } - - optionalObjectField(builder, "properties", buildPropertiesMap(metadata)) - } - builder.field("properties", metadata.schema) - }) - } catch { - case e: Exception => - throw new IllegalStateException("Failed to jsonify cache metadata", e) - } + private[metadatacache] def getIndexMapping( + client: IRestHighLevelClient, + osIndexName: String): JMap[String, AnyRef] = { + val request = new GetIndexRequest(osIndexName) + val response = client.getIndex(request, RequestOptions.DEFAULT) + response.getMappings.get(osIndexName).sourceAsMap() } /** - * Since _meta.properties is shared by both index metadata and metadata cache, here we merge the - * two maps. + * Merge metadata cache properties into index mapping in place. Metadata cache is written into + * _meta.properties field of index mapping. */ - private def buildPropertiesMap(metadata: FlintMetadata): util.Map[String, AnyRef] = { - val metadataCacheProperties = FlintMetadataCache(metadata).toMap + private def mergeMetadataCacheProperties( + indexMapping: JMap[String, AnyRef], + metadataCacheProperties: JMap[String, AnyRef]): Unit = { + indexMapping + .computeIfAbsent("_meta", _ => new HashMap[String, AnyRef]()) + .asInstanceOf[JMap[String, AnyRef]] + .computeIfAbsent("properties", _ => new HashMap[String, AnyRef]()) + .asInstanceOf[JMap[String, AnyRef]] + .putAll(metadataCacheProperties) + } - if (includeSpec) { - (metadataCacheProperties ++ metadata.properties.asScala).asJava - } else { - metadataCacheProperties.asJava - } + private[metadatacache] def updateIndexMapping( + client: IRestHighLevelClient, + osIndexName: String, + mappingSource: String): Unit = { + val request = new PutMappingRequest(osIndexName) + request.source(mappingSource, XContentType.JSON) + client.updateIndexMapping(request, RequestOptions.DEFAULT) } } diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/metadatacache/FlintMetadataCacheSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/metadatacache/FlintMetadataCacheSuite.scala index 6ec6cf696..971c18857 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/metadatacache/FlintMetadataCacheSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/metadatacache/FlintMetadataCacheSuite.scala @@ -83,11 +83,13 @@ class FlintMetadataCacheSuite extends AnyFlatSpec with Matchers { } it should "construct from materialized view FlintMetadata" in { + val testQuery = + "SELECT 1 FROM spark_catalog.default.test_table UNION SELECT 1 FROM spark_catalog.default.another_table" val content = s""" { | "_meta": { | "kind": "$MV_INDEX_TYPE", - | "source": "spark_catalog.default.wrong_table", + | "source": "$testQuery", | "options": { | "auto_refresh": "true", | "refresh_interval": "10 Minutes" @@ -116,6 +118,7 @@ class FlintMetadataCacheSuite extends AnyFlatSpec with Matchers { metadataCache.sourceTables shouldBe Array( "spark_catalog.default.test_table", "spark_catalog.default.another_table") + metadataCache.sourceQuery.get shouldBe testQuery metadataCache.lastRefreshTime.get shouldBe 1234567890123L } @@ -145,6 +148,7 @@ class FlintMetadataCacheSuite extends AnyFlatSpec with Matchers { metadataCache.metadataCacheVersion shouldBe FlintMetadataCache.metadataCacheVersion metadataCache.refreshInterval shouldBe empty metadataCache.sourceTables shouldBe Array("spark_catalog.default.test_table") + metadataCache.sourceQuery shouldBe empty metadataCache.lastRefreshTime shouldBe empty } } diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/metadatacache/FlintOpenSearchMetadataCacheWriterITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/metadatacache/FlintOpenSearchMetadataCacheWriterITSuite.scala index 5b4dd0208..692a0c2ff 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/metadatacache/FlintOpenSearchMetadataCacheWriterITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/metadatacache/FlintOpenSearchMetadataCacheWriterITSuite.scala @@ -5,18 +5,17 @@ package org.opensearch.flint.spark.metadatacache -import java.util.{Base64, List} +import java.util.{Base64, Map => JMap} import scala.collection.JavaConverters._ import com.stephenn.scalatest.jsonassert.JsonMatchers.matchJson +import org.json4s.{DefaultFormats, Extraction, JValue} import org.json4s.native.JsonMethods._ -import org.opensearch.flint.common.FlintVersion.current -import org.opensearch.flint.common.metadata.FlintMetadata import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry import org.opensearch.flint.core.FlintOptions -import org.opensearch.flint.core.storage.{FlintOpenSearchClient, FlintOpenSearchIndexMetadataService} -import org.opensearch.flint.spark.{FlintSparkIndexOptions, FlintSparkSuite} +import org.opensearch.flint.core.storage.{FlintOpenSearchClient, FlintOpenSearchIndexMetadataService, OpenSearchClientUtils} +import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndexOptions, FlintSparkSuite} import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.COVERING_INDEX_TYPE import org.opensearch.flint.spark.mv.FlintSparkMaterializedView import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.MV_INDEX_TYPE @@ -69,63 +68,24 @@ class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Mat } test("build opensearch metadata cache writer") { - setFlintSparkConf(FlintSparkConf.METADATA_CACHE_WRITE, "true") withMetadataCacheWriteEnabled { FlintMetadataCacheWriterBuilder .build(FlintSparkConf()) shouldBe a[FlintOpenSearchMetadataCacheWriter] } } - test("serialize metadata cache to JSON") { - val expectedMetadataJson: String = s""" - | { - | "_meta": { - | "version": "${current()}", - | "name": "$testFlintIndex", - | "kind": "test_kind", - | "source": "$testTable", - | "indexedColumns": [ - | { - | "test_field": "spark_type" - | }], - | "options": { - | "auto_refresh": "true", - | "refresh_interval": "10 Minutes" - | }, - | "properties": { - | "metadataCacheVersion": "${FlintMetadataCache.metadataCacheVersion}", - | "refreshInterval": 600, - | "sourceTables": ["$testTable"], - | "lastRefreshTime": $testLastRefreshCompleteTime - | }, - | "latestId": "$testLatestId" - | }, - | "properties": { - | "test_field": { - | "type": "os_type" - | } - | } - | } - |""".stripMargin - val builder = new FlintMetadata.Builder - builder.name(testFlintIndex) - builder.kind("test_kind") - builder.source(testTable) - builder.addIndexedColumn(Map[String, AnyRef]("test_field" -> "spark_type").asJava) - builder.options( - Map("auto_refresh" -> "true", "refresh_interval" -> "10 Minutes") - .mapValues(_.asInstanceOf[AnyRef]) - .asJava) - builder.schema(Map[String, AnyRef]("test_field" -> Map("type" -> "os_type").asJava).asJava) - builder.latestLogEntry(flintMetadataLogEntry) - - val metadata = builder.build() - flintMetadataCacheWriter.serialize(metadata) should matchJson(expectedMetadataJson) - } - test("write metadata cache to index mappings") { + val content = + s""" { + | "properties": { + | "age": { + | "type": "integer" + | } + | } + | } + |""".stripMargin val metadata = FlintOpenSearchIndexMetadataService - .deserialize("{}") + .deserialize(content) .copy(latestLogEntry = Some(flintMetadataLogEntry)) flintClient.createIndex(testFlintIndex, metadata) flintMetadataCacheWriter.updateMetadataCache(testFlintIndex, metadata) @@ -139,7 +99,7 @@ class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Mat } Seq(SKIPPING_INDEX_TYPE, COVERING_INDEX_TYPE).foreach { case kind => - test(s"write metadata cache to $kind index mappings with source tables") { + test(s"write metadata cache to $kind index mappings with source tables for non mv index") { val content = s""" { | "_meta": { @@ -164,10 +124,11 @@ class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Mat .get("sourceTables") .asInstanceOf[java.util.ArrayList[String]] should contain theSameElementsAs Array( testTable) + properties should not contain key("sourceQuery") } } - test("write metadata cache with source tables from index metadata") { + test("write metadata cache with source tables and query from mv index metadata") { val mv = FlintSparkMaterializedView( "spark_catalog.default.mv", s"SELECT 1 FROM $testTable", @@ -182,9 +143,12 @@ class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Mat properties .get("sourceTables") .asInstanceOf[java.util.ArrayList[String]] should contain theSameElementsAs Array(testTable) + properties + .get("sourceQuery") + .asInstanceOf[String] shouldBe s"SELECT 1 FROM $testTable" } - test("write metadata cache with source tables from deserialized metadata") { + test("write metadata cache with source tables and query from deserialized mv metadata") { val testTable2 = "spark_catalog.default.metadatacache_test2" val content = s""" { @@ -272,31 +236,39 @@ class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Mat flintMetadataCacheWriter.updateMetadataCache(testFlintIndex, metadata) val properties = flintIndexMetadataService.getIndexMetadata(testFlintIndex).properties - properties should have size 3 - properties should contain allOf (Entry( - "metadataCacheVersion", - FlintMetadataCache.metadataCacheVersion), - Entry("lastRefreshTime", testLastRefreshCompleteTime)) + properties should not contain key("refreshInterval") } test("exclude last refresh time in metadata cache when index has not been refreshed") { + val content = + s""" { + | "properties": { + | "age": { + | "type": "integer" + | } + | } + | } + |""".stripMargin val metadata = FlintOpenSearchIndexMetadataService - .deserialize("{}") + .deserialize(content) .copy(latestLogEntry = Some(flintMetadataLogEntry.copy(lastRefreshCompleteTime = 0L))) flintClient.createIndex(testFlintIndex, metadata) flintMetadataCacheWriter.updateMetadataCache(testFlintIndex, metadata) val properties = flintIndexMetadataService.getIndexMetadata(testFlintIndex).properties - properties should have size 2 - properties should contain( - Entry("metadataCacheVersion", FlintMetadataCache.metadataCacheVersion)) + properties should not contain key("lastRefreshTime") } test("write metadata cache to index mappings and preserve other index metadata") { val content = """ { | "_meta": { - | "kind": "test_kind" + | "kind": "test_kind", + | "name": "test_name", + | "custom": "test_custom", + | "properties": { + | "custom_in_properties": "test_custom" + | } | }, | "properties": { | "age": { @@ -311,48 +283,31 @@ class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Mat .copy(latestLogEntry = Some(flintMetadataLogEntry)) flintClient.createIndex(testFlintIndex, metadata) - flintIndexMetadataService.updateIndexMetadata(testFlintIndex, metadata) - flintMetadataCacheWriter.updateMetadataCache(testFlintIndex, metadata) + // Simulates index mapping updated by custom implementation of FlintIndexMetadataService + // with the extra "custom" field. + val client = OpenSearchClientUtils.createClient(options) + flintMetadataCacheWriter.updateIndexMapping(client, testFlintIndex, content) - flintIndexMetadataService.getIndexMetadata(testFlintIndex).kind shouldBe "test_kind" - flintIndexMetadataService.getIndexMetadata(testFlintIndex).name shouldBe empty - flintIndexMetadataService.getIndexMetadata(testFlintIndex).schema should have size 1 - var properties = flintIndexMetadataService.getIndexMetadata(testFlintIndex).properties - properties should have size 3 - properties should contain allOf (Entry( - "metadataCacheVersion", - FlintMetadataCache.metadataCacheVersion), - Entry("lastRefreshTime", testLastRefreshCompleteTime)) - - val newContent = - """ { - | "_meta": { - | "kind": "test_kind", - | "name": "test_name" - | }, - | "properties": { - | "age": { - | "type": "integer" - | } - | } - | } - |""".stripMargin - - val newMetadata = FlintOpenSearchIndexMetadataService - .deserialize(newContent) - .copy(latestLogEntry = Some(flintMetadataLogEntry)) - flintIndexMetadataService.updateIndexMetadata(testFlintIndex, newMetadata) - flintMetadataCacheWriter.updateMetadataCache(testFlintIndex, newMetadata) + flintMetadataCacheWriter.updateMetadataCache(testFlintIndex, metadata) flintIndexMetadataService.getIndexMetadata(testFlintIndex).kind shouldBe "test_kind" flintIndexMetadataService.getIndexMetadata(testFlintIndex).name shouldBe "test_name" flintIndexMetadataService.getIndexMetadata(testFlintIndex).schema should have size 1 - properties = flintIndexMetadataService.getIndexMetadata(testFlintIndex).properties - properties should have size 3 + val properties = flintIndexMetadataService.getIndexMetadata(testFlintIndex).properties + properties should have size 4 properties should contain allOf (Entry( "metadataCacheVersion", FlintMetadataCache.metadataCacheVersion), - Entry("lastRefreshTime", testLastRefreshCompleteTime)) + Entry("lastRefreshTime", testLastRefreshCompleteTime), Entry( + "custom_in_properties", + "test_custom")) + + // Directly get the index mapping and verify custom field is preserved + flintMetadataCacheWriter + .getIndexMapping(client, testFlintIndex) + .get("_meta") + .asInstanceOf[JMap[String, AnyRef]] + .get("custom") shouldBe "test_custom" } Seq( @@ -391,6 +346,7 @@ class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Mat test(s"write metadata cache for $refreshMode") { withExternalSchedulerEnabled { withMetadataCacheWriteEnabled { + val flint: FlintSpark = new FlintSpark(spark) withTempDir { checkpointDir => // update checkpoint_location if available in optionsMap val indexOptions = FlintSparkIndexOptions( @@ -407,25 +363,12 @@ class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Mat .options(indexOptions, testFlintIndex) .create() - var index = flint.describeIndex(testFlintIndex) - index shouldBe defined - val propertiesJson = - compact( - render( - parse( - flintMetadataCacheWriter.serialize( - index.get.metadata())) \ "_meta" \ "properties")) + val propertiesJson = compact(render(getPropertiesJValue(testFlintIndex))) propertiesJson should matchJson(expectedJson) flint.refreshIndex(testFlintIndex) - index = flint.describeIndex(testFlintIndex) - index shouldBe defined val lastRefreshTime = - compact( - render( - parse( - flintMetadataCacheWriter.serialize( - index.get.metadata())) \ "_meta" \ "properties" \ "lastRefreshTime")).toLong + compact(render(getPropertiesJValue(testFlintIndex) \ "lastRefreshTime")).toLong lastRefreshTime should be > 0L } } @@ -435,6 +378,7 @@ class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Mat test("write metadata cache for auto refresh index with internal scheduler") { withMetadataCacheWriteEnabled { + val flint: FlintSpark = new FlintSpark(spark) withTempDir { checkpointDir => flint .skippingIndex() @@ -450,12 +394,7 @@ class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Mat testFlintIndex) .create() - var index = flint.describeIndex(testFlintIndex) - index shouldBe defined - val propertiesJson = - compact( - render(parse( - flintMetadataCacheWriter.serialize(index.get.metadata())) \ "_meta" \ "properties")) + val propertiesJson = compact(render(getPropertiesJValue(testFlintIndex))) propertiesJson should matchJson(s""" | { | "metadataCacheVersion": "${FlintMetadataCache.metadataCacheVersion}", @@ -465,12 +404,15 @@ class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Mat |""".stripMargin) flint.refreshIndex(testFlintIndex) - index = flint.describeIndex(testFlintIndex) - index shouldBe defined - compact(render(parse( - flintMetadataCacheWriter.serialize( - index.get.metadata())) \ "_meta" \ "properties")) should not include "lastRefreshTime" + compact(render(getPropertiesJValue(testFlintIndex))) should not include "lastRefreshTime" } } } + + private def getPropertiesJValue(indexName: String): JValue = { + // Convert to scala map because json4s converts java.util.Map into an empty JObject + // https://github.com/json4s/json4s/issues/392 + val properties = flintIndexMetadataService.getIndexMetadata(indexName).properties.asScala + Extraction.decompose(properties)(DefaultFormats) + } }