diff --git a/docs/index.md b/docs/index.md index d0228cceb..3c3f02dc5 100644 --- a/docs/index.md +++ b/docs/index.md @@ -358,6 +358,9 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i (false), IMMEDIATE(true), WAIT_UNTIL(wait_for)] - `spark.datasource.flint.read.scroll_size`: default value is 100. - `spark.datasource.flint.read.scroll_duration`: default value is 5 minutes. scroll context keep alive duration. +- `spark.datasource.flint.retry.max_retries`: default value is 3. max retries on failed HTTP request. 0 means retry is disabled. +- `spark.datasource.flint.retry.http_status_codes`: default value is "429,502". retryable HTTP response status code list +- `spark.datasource.flint.retry.exception_class_names`: by default no retry on any exception thrown. retryable exception class name list. - `spark.flint.optimizer.enabled`: default is true. - `spark.flint.index.hybridscan.enabled`: default is false. - `spark.flint.index.checkpoint.mandatory`: default is true. diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/http/FlintRetryOptions.java b/flint-core/src/main/scala/org/opensearch/flint/core/http/FlintRetryOptions.java index 5e41abd94..616723207 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/http/FlintRetryOptions.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/http/FlintRetryOptions.java @@ -15,6 +15,7 @@ import java.util.Arrays; import java.util.HashSet; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.WeakHashMap; import java.util.logging.Logger; @@ -82,17 +83,31 @@ public RetryPolicy getRetryPolicy() { LOG.warning("Retrying failed request at #" + ex.getAttemptCount())).build(); } - private int getMaxRetries() { + /** + * @return maximum retry option value + */ + public int getMaxRetries() { return Integer.parseInt( options.getOrDefault(MAX_RETRIES, String.valueOf(DEFAULT_MAX_RETRIES))); } + /** + * @return retryable HTTP status code list + */ + public String getRetryableHttpStatusCodes() { + return options.getOrDefault(RETRYABLE_HTTP_STATUS_CODES, DEFAULT_RETRYABLE_HTTP_STATUS_CODES); + } + + /** + * @return retryable exception class name list + */ + public Optional getRetryableExceptionClassNames() { + return Optional.ofNullable(options.get(RETRYABLE_EXCEPTION_CLASS_NAMES)); + } + private CheckedPredicate getRetryableResultHandler() { Set retryableStatusCodes = - Arrays.stream( - options.getOrDefault( - RETRYABLE_HTTP_STATUS_CODES, - DEFAULT_RETRYABLE_HTTP_STATUS_CODES).split(",")) + Arrays.stream(getRetryableHttpStatusCodes().split(",")) .map(String::trim) .map(Integer::valueOf) .collect(Collectors.toSet()); @@ -103,14 +118,14 @@ private CheckedPredicate getRetryableResultHandler() { private CheckedPredicate getRetryableExceptionHandler() { // By default, Failsafe handles any Exception - String exceptionClassNames = options.get(RETRYABLE_EXCEPTION_CLASS_NAMES); - if (exceptionClassNames == null || exceptionClassNames.isEmpty()) { + Optional exceptionClassNames = getRetryableExceptionClassNames(); + if (exceptionClassNames.isEmpty()) { return ex -> false; } // Use weak collection avoids blocking class unloading Set> retryableExceptions = newSetFromMap(new WeakHashMap<>()); - Arrays.stream(exceptionClassNames.split(",")) + Arrays.stream(exceptionClassNames.get().split(",")) .map(String::trim) .map(this::loadClass) .forEach(retryableExceptions::add); diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala index 2c42f9f20..75feda832 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala @@ -11,6 +11,7 @@ import java.util.{Map => JMap, NoSuchElementException} import scala.collection.JavaConverters._ import org.opensearch.flint.core.FlintOptions +import org.opensearch.flint.core.http.FlintRetryOptions import org.apache.spark.internal.config.ConfigReader import org.apache.spark.sql.flint.config.FlintSparkConf._ @@ -111,6 +112,23 @@ object FlintSparkConf { .doc("scroll duration in minutes") .createWithDefault(String.valueOf(FlintOptions.DEFAULT_SCROLL_DURATION)) + val MAX_RETRIES = FlintConfig(s"spark.datasource.flint.${FlintRetryOptions.MAX_RETRIES}") + .datasourceOption() + .doc("max retries on failed HTTP request, 0 means retry is disabled, default is 3") + .createWithDefault(String.valueOf(FlintRetryOptions.DEFAULT_MAX_RETRIES)) + + val RETRYABLE_HTTP_STATUS_CODES = + FlintConfig(s"spark.datasource.flint.${FlintRetryOptions.RETRYABLE_HTTP_STATUS_CODES}") + .datasourceOption() + .doc("retryable HTTP response status code list") + .createWithDefault(FlintRetryOptions.DEFAULT_RETRYABLE_HTTP_STATUS_CODES) + + val RETRYABLE_EXCEPTION_CLASS_NAMES = + FlintConfig(s"spark.datasource.flint.${FlintRetryOptions.RETRYABLE_EXCEPTION_CLASS_NAMES}") + .datasourceOption() + .doc("retryable exception class name list, by default no retry on exception thrown") + .createOptional() + val OPTIMIZER_RULE_ENABLED = FlintConfig("spark.flint.optimizer.enabled") .doc("Enable Flint optimizer rule for query rewrite with Flint index") .createWithDefault("true") @@ -166,6 +184,8 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable SCROLL_DURATION, SCHEME, AUTH, + MAX_RETRIES, + RETRYABLE_HTTP_STATUS_CODES, // TODO: add optional exception class name option REGION, CUSTOM_AWS_CREDENTIALS_PROVIDER, USERNAME, diff --git a/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala b/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala index c15cf1073..0738d2e5e 100644 --- a/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala +++ b/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala @@ -5,8 +5,13 @@ package org.apache.spark.sql.flint.config +import java.util.Optional + import scala.collection.JavaConverters._ +import org.opensearch.flint.core.http.FlintRetryOptions._ +import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper + import org.apache.spark.FlintSuite class FlintSparkConfSuite extends FlintSuite { @@ -36,6 +41,13 @@ class FlintSparkConfSuite extends FlintSuite { assert(options.flintOptions().getPort == 9200) } + test("test retry options default values") { + val retryOptions = FlintSparkConf().flintOptions().getRetryOptions + retryOptions.getMaxRetries shouldBe DEFAULT_MAX_RETRIES + retryOptions.getRetryableHttpStatusCodes shouldBe DEFAULT_RETRYABLE_HTTP_STATUS_CODES + retryOptions.getRetryableExceptionClassNames shouldBe Optional.empty + } + /** * Delete index `indexNames` after calling `f`. */