Skip to content

Commit

Permalink
Merge branch 'main' into ppl_json_delete_expand_support
Browse files Browse the repository at this point in the history
  • Loading branch information
YANG-DB committed Dec 12, 2024
2 parents fe8d600 + 5052ffe commit 45fc073
Show file tree
Hide file tree
Showing 10 changed files with 240 additions and 202 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@
package org.opensearch.flint.core.http;

import static java.time.temporal.ChronoUnit.SECONDS;
import static org.opensearch.flint.core.FlintOptions.SERVICE_NAME;
import static org.opensearch.flint.core.FlintOptions.SERVICE_NAME_AOSS;
import static org.opensearch.flint.core.FlintOptions.SERVICE_NAME_ES;

import dev.failsafe.RetryPolicy;
import dev.failsafe.RetryPolicyBuilder;
import dev.failsafe.event.ExecutionAttemptedEvent;
import dev.failsafe.function.CheckedPredicate;
import java.time.Duration;
Expand All @@ -16,6 +20,7 @@
import java.util.logging.Logger;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.flint.core.http.handler.ExceptionClassNameFailurePredicate;
import org.opensearch.flint.core.http.handler.HttpAOSSResultPredicate;
import org.opensearch.flint.core.http.handler.HttpStatusCodeResultPredicate;
import java.io.Serializable;

Expand Down Expand Up @@ -65,7 +70,7 @@ public boolean isRetryEnabled() {
* @return Failsafe retry policy
*/
public <T> RetryPolicy<T> getRetryPolicy() {
return RetryPolicy.<T>builder()
RetryPolicyBuilder<T> builder = RetryPolicy.<T>builder()
// Backoff strategy config (can be configurable as needed in future)
.withBackoff(1, 30, SECONDS)
.withJitter(Duration.ofMillis(100))
Expand All @@ -75,8 +80,11 @@ public <T> RetryPolicy<T> getRetryPolicy() {
.handleResultIf(new HttpStatusCodeResultPredicate<>(getRetryableHttpStatusCodes()))
// Logging listener
.onFailedAttempt(FlintRetryOptions::onFailure)
.onRetry(FlintRetryOptions::onRetry)
.build();
.onRetry(FlintRetryOptions::onRetry);
if (SERVICE_NAME_AOSS.equals(getServiceName())) {
builder.handleResultIf(new HttpAOSSResultPredicate<>());
}
return builder.build();
}

public RetryPolicy<BulkResponse> getBulkRetryPolicy(CheckedPredicate<BulkResponse> resultPredicate) {
Expand All @@ -101,6 +109,10 @@ private static <T> void onRetry(ExecutionAttemptedEvent<T> event) {
LOG.warning("Retrying failed request at #" + event.getAttemptCount());
}

private String getServiceName() {
return options.getOrDefault(SERVICE_NAME, SERVICE_NAME_ES);
}

/**
* @return maximum retry option value
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.http.handler;

import dev.failsafe.function.CheckedPredicate;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.entity.BufferedHttpEntity;
import org.apache.http.util.EntityUtils;

import java.util.logging.Logger;

/**
* Failure handler based on HTTP response from AOSS.
*
* @param <T> result type (supposed to be HttpResponse for OS client)
*/
public class HttpAOSSResultPredicate<T> implements CheckedPredicate<T> {

private static final Logger LOG = Logger.getLogger(HttpAOSSResultPredicate.class.getName());

public static final int BAD_REQUEST_STATUS_CODE = 400;
public static final String RESOURCE_ALREADY_EXISTS_EXCEPTION_MESSAGE = "resource_already_exists_exception";

public HttpAOSSResultPredicate() { }

@Override
public boolean test(T result) throws Throwable {
LOG.info("Checking if response is retryable");

int statusCode = ((HttpResponse) result).getStatusLine().getStatusCode();
if (statusCode != BAD_REQUEST_STATUS_CODE) {
LOG.info("Status code " + statusCode + " is not " + BAD_REQUEST_STATUS_CODE + ". Check result: false");
return false;
}

HttpResponse response = (HttpResponse) result;
HttpEntity entity = response.getEntity();
if (entity == null) {
LOG.info("No response entity found. Check result: false");
return false;
}

// Buffer the entity to make it repeatable, so that this retry test does not consume the content stream,
// resulting in the request caller getting empty response
BufferedHttpEntity bufferedEntity = new BufferedHttpEntity(entity);
response.setEntity(bufferedEntity);

try {
String responseContent = EntityUtils.toString(bufferedEntity);
// Effectively restores the content stream of the response
bufferedEntity.getContent().reset();

boolean isRetryable = responseContent.contains(RESOURCE_ALREADY_EXISTS_EXCEPTION_MESSAGE);

LOG.info("Check retryable response result: " + isRetryable);
return isRetryable;
} catch (Exception e) {
LOG.info("Unable to parse response body. Check result: false");
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ import java.util.concurrent.{ExecutionException, Future}

import scala.collection.JavaConverters.mapAsJavaMapConverter

import org.apache.http.HttpEntity
import org.apache.http.HttpResponse
import org.apache.http.concurrent.FutureCallback
import org.apache.http.impl.nio.client.{CloseableHttpAsyncClient, HttpAsyncClientBuilder}
import org.apache.http.nio.protocol.{HttpAsyncRequestProducer, HttpAsyncResponseConsumer}
import org.apache.http.protocol.HttpContext
import org.apache.http.util.EntityUtils
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito._
import org.mockito.verification.VerificationMode
Expand Down Expand Up @@ -153,6 +155,23 @@ class RetryableHttpAsyncClientSuite extends AnyFlatSpec with BeforeAndAfter with
expectFutureGetTimes = times(0))
}

it should "retry if AOSS response is retryable" in {
retryableClient
.withOption("auth.servicename", "aoss")
.whenResponse(
400,
"OpenSearchStatusException[OpenSearch exception [type=resource_already_exists_exception,")
.shouldExecute(times(DEFAULT_MAX_RETRIES + 1))
}

it should "not apply retry policy for AOSS response if service is not AOSS" in {
retryableClient
.whenResponse(
400,
"OpenSearchStatusException[OpenSearch exception [type=resource_already_exists_exception,")
.shouldExecute(times(1))
}

private def retryableClient: AssertionHelper = new AssertionHelper

class AssertionHelper {
Expand All @@ -175,6 +194,17 @@ class RetryableHttpAsyncClientSuite extends AnyFlatSpec with BeforeAndAfter with
this
}

def whenResponse(statusCode: Int, responseMessage: String): AssertionHelper = {
val entity = mock[HttpEntity](RETURNS_DEEP_STUBS)
mockStatic(classOf[EntityUtils])
when(EntityUtils.toString(any[HttpEntity])).thenReturn(responseMessage)
val response = mock[HttpResponse](RETURNS_DEEP_STUBS)
when(response.getStatusLine.getStatusCode).thenReturn(statusCode)
when(response.getEntity).thenReturn(entity)
when(future.get()).thenReturn(response)
this
}

def shouldExecute(expectExecuteTimes: VerificationMode): Unit = {
shouldExecute(expectExecuteTimes, expectExecuteTimes)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ case class FlintMetadataCache(
refreshInterval: Option[Int],
/** Source table names for building the Flint index. */
sourceTables: Array[String],
/** Source query for MV */
sourceQuery: Option[String],
/** Timestamp when Flint index is last refreshed. Unit: milliseconds */
lastRefreshTime: Option[Long]) {

Expand Down Expand Up @@ -64,13 +66,22 @@ object FlintMetadataCache {
case MV_INDEX_TYPE => getSourceTablesFromMetadata(metadata)
case _ => Array(metadata.source)
}
val sourceQuery = metadata.kind match {
case MV_INDEX_TYPE => Some(metadata.source)
case _ => None
}
val lastRefreshTime: Option[Long] = metadata.latestLogEntry.flatMap { entry =>
entry.lastRefreshCompleteTime match {
case FlintMetadataLogEntry.EMPTY_TIMESTAMP => None
case timestamp => Some(timestamp)
}
}

FlintMetadataCache(metadataCacheVersion, refreshInterval, sourceTables, lastRefreshTime)
FlintMetadataCache(
metadataCacheVersion,
refreshInterval,
sourceTables,
sourceQuery,
lastRefreshTime)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,18 @@

package org.opensearch.flint.spark.metadatacache

import java.util
import java.util.{HashMap, Map => JMap}

import scala.collection.JavaConverters._

import org.opensearch.client.RequestOptions
import org.opensearch.client.indices.GetIndexRequest
import org.opensearch.client.indices.PutMappingRequest
import org.opensearch.common.xcontent.XContentType
import org.opensearch.flint.common.metadata.{FlintIndexMetadataService, FlintMetadata}
import org.opensearch.flint.common.metadata.FlintMetadata
import org.opensearch.flint.core.{FlintOptions, IRestHighLevelClient}
import org.opensearch.flint.core.metadata.FlintIndexMetadataServiceBuilder
import org.opensearch.flint.core.metadata.FlintJsonHelper._
import org.opensearch.flint.core.storage.{FlintOpenSearchIndexMetadataService, OpenSearchClientUtils}
import org.opensearch.flint.core.storage.OpenSearchClientUtils

import org.apache.spark.internal.Logging

Expand All @@ -27,27 +27,20 @@ class FlintOpenSearchMetadataCacheWriter(options: FlintOptions)
extends FlintMetadataCacheWriter
with Logging {

/**
* Since metadata cache shares the index mappings _meta field with OpenSearch index metadata
* storage, this flag is to allow for preserving index metadata that is already stored in _meta
* when updating metadata cache.
*/
private val includeSpec: Boolean =
FlintIndexMetadataServiceBuilder
.build(options)
.isInstanceOf[FlintOpenSearchIndexMetadataService]

override def updateMetadataCache(indexName: String, metadata: FlintMetadata): Unit = {
logInfo(s"Updating metadata cache for $indexName with $metadata");
logInfo(s"Updating metadata cache for $indexName");
val osIndexName = OpenSearchClientUtils.sanitizeIndexName(indexName)
var client: IRestHighLevelClient = null
try {
client = OpenSearchClientUtils.createClient(options)
val request = new PutMappingRequest(osIndexName)
val serialized = serialize(metadata)
logInfo(s"Serialized: $serialized")
request.source(serialized, XContentType.JSON)
client.updateIndexMapping(request, RequestOptions.DEFAULT)
val indexMapping = getIndexMapping(client, osIndexName)
val metadataCacheProperties = FlintMetadataCache(metadata).toMap.asJava
mergeMetadataCacheProperties(indexMapping, metadataCacheProperties)
val serialized = buildJson(builder => {
builder.field("_meta", indexMapping.get("_meta"))
builder.field("properties", indexMapping.get("properties"))
})
updateIndexMapping(client, osIndexName, serialized)
} catch {
case e: Exception =>
throw new IllegalStateException(
Expand All @@ -59,50 +52,35 @@ class FlintOpenSearchMetadataCacheWriter(options: FlintOptions)
}
}

/**
* Serialize FlintMetadataCache from FlintMetadata. Modified from {@link
* FlintOpenSearchIndexMetadataService}
*/
private[metadatacache] def serialize(metadata: FlintMetadata): String = {
try {
buildJson(builder => {
objectField(builder, "_meta") {
// If _meta is used as index metadata storage, preserve them.
if (includeSpec) {
builder
.field("version", metadata.version.version)
.field("name", metadata.name)
.field("kind", metadata.kind)
.field("source", metadata.source)
.field("indexedColumns", metadata.indexedColumns)

if (metadata.latestId.isDefined) {
builder.field("latestId", metadata.latestId.get)
}
optionalObjectField(builder, "options", metadata.options)
}

optionalObjectField(builder, "properties", buildPropertiesMap(metadata))
}
builder.field("properties", metadata.schema)
})
} catch {
case e: Exception =>
throw new IllegalStateException("Failed to jsonify cache metadata", e)
}
private[metadatacache] def getIndexMapping(
client: IRestHighLevelClient,
osIndexName: String): JMap[String, AnyRef] = {
val request = new GetIndexRequest(osIndexName)
val response = client.getIndex(request, RequestOptions.DEFAULT)
response.getMappings.get(osIndexName).sourceAsMap()
}

/**
* Since _meta.properties is shared by both index metadata and metadata cache, here we merge the
* two maps.
* Merge metadata cache properties into index mapping in place. Metadata cache is written into
* _meta.properties field of index mapping.
*/
private def buildPropertiesMap(metadata: FlintMetadata): util.Map[String, AnyRef] = {
val metadataCacheProperties = FlintMetadataCache(metadata).toMap
private def mergeMetadataCacheProperties(
indexMapping: JMap[String, AnyRef],
metadataCacheProperties: JMap[String, AnyRef]): Unit = {
indexMapping
.computeIfAbsent("_meta", _ => new HashMap[String, AnyRef]())
.asInstanceOf[JMap[String, AnyRef]]
.computeIfAbsent("properties", _ => new HashMap[String, AnyRef]())
.asInstanceOf[JMap[String, AnyRef]]
.putAll(metadataCacheProperties)
}

if (includeSpec) {
(metadataCacheProperties ++ metadata.properties.asScala).asJava
} else {
metadataCacheProperties.asJava
}
private[metadatacache] def updateIndexMapping(
client: IRestHighLevelClient,
osIndexName: String,
mappingSource: String): Unit = {
val request = new PutMappingRequest(osIndexName)
request.source(mappingSource, XContentType.JSON)
client.updateIndexMapping(request, RequestOptions.DEFAULT)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,13 @@ class FlintMetadataCacheSuite extends AnyFlatSpec with Matchers {
}

it should "construct from materialized view FlintMetadata" in {
val testQuery =
"SELECT 1 FROM spark_catalog.default.test_table UNION SELECT 1 FROM spark_catalog.default.another_table"
val content =
s""" {
| "_meta": {
| "kind": "$MV_INDEX_TYPE",
| "source": "spark_catalog.default.wrong_table",
| "source": "$testQuery",
| "options": {
| "auto_refresh": "true",
| "refresh_interval": "10 Minutes"
Expand Down Expand Up @@ -116,6 +118,7 @@ class FlintMetadataCacheSuite extends AnyFlatSpec with Matchers {
metadataCache.sourceTables shouldBe Array(
"spark_catalog.default.test_table",
"spark_catalog.default.another_table")
metadataCache.sourceQuery.get shouldBe testQuery
metadataCache.lastRefreshTime.get shouldBe 1234567890123L
}

Expand Down Expand Up @@ -145,6 +148,7 @@ class FlintMetadataCacheSuite extends AnyFlatSpec with Matchers {
metadataCache.metadataCacheVersion shouldBe FlintMetadataCache.metadataCacheVersion
metadataCache.refreshInterval shouldBe empty
metadataCache.sourceTables shouldBe Array("spark_catalog.default.test_table")
metadataCache.sourceQuery shouldBe empty
metadataCache.lastRefreshTime shouldBe empty
}
}
Loading

0 comments on commit 45fc073

Please sign in to comment.