Skip to content

Commit

Permalink
Add Spark conf and user manual
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Nov 22, 2023
1 parent 948fe93 commit a527190
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 8 deletions.
3 changes: 3 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,9 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i
(false), IMMEDIATE(true), WAIT_UNTIL(wait_for)]
- `spark.datasource.flint.read.scroll_size`: default value is 100.
- `spark.datasource.flint.read.scroll_duration`: default value is 5 minutes. scroll context keep alive duration.
- `spark.datasource.flint.retry.max_retries`: default value is 3. max retries on failed HTTP request. 0 means retry is disabled.
- `spark.datasource.flint.retry.http_status_codes`: default value is "429,502". retryable HTTP response status code list
- `spark.datasource.flint.retry.exception_class_names`: by default no retry on any exception thrown. retryable exception class name list.
- `spark.flint.optimizer.enabled`: default is true.
- `spark.flint.index.hybridscan.enabled`: default is false.
- `spark.flint.index.checkpoint.mandatory`: default is true.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.logging.Logger;
Expand Down Expand Up @@ -82,17 +83,31 @@ public <T> RetryPolicy<T> getRetryPolicy() {
LOG.warning("Retrying failed request at #" + ex.getAttemptCount())).build();
}

private int getMaxRetries() {
/**
* @return maximum retry option value
*/
public int getMaxRetries() {
return Integer.parseInt(
options.getOrDefault(MAX_RETRIES, String.valueOf(DEFAULT_MAX_RETRIES)));
}

/**
* @return retryable HTTP status code list
*/
public String getRetryableHttpStatusCodes() {
return options.getOrDefault(RETRYABLE_HTTP_STATUS_CODES, DEFAULT_RETRYABLE_HTTP_STATUS_CODES);
}

/**
* @return retryable exception class name list
*/
public Optional<String> getRetryableExceptionClassNames() {
return Optional.ofNullable(options.get(RETRYABLE_EXCEPTION_CLASS_NAMES));
}

private <T> CheckedPredicate<T> getRetryableResultHandler() {
Set<Integer> retryableStatusCodes =
Arrays.stream(
options.getOrDefault(
RETRYABLE_HTTP_STATUS_CODES,
DEFAULT_RETRYABLE_HTTP_STATUS_CODES).split(","))
Arrays.stream(getRetryableHttpStatusCodes().split(","))
.map(String::trim)
.map(Integer::valueOf)
.collect(Collectors.toSet());
Expand All @@ -103,14 +118,14 @@ private <T> CheckedPredicate<T> getRetryableResultHandler() {

private CheckedPredicate<? extends Throwable> getRetryableExceptionHandler() {
// By default, Failsafe handles any Exception
String exceptionClassNames = options.get(RETRYABLE_EXCEPTION_CLASS_NAMES);
if (exceptionClassNames == null || exceptionClassNames.isEmpty()) {
Optional<String> exceptionClassNames = getRetryableExceptionClassNames();
if (exceptionClassNames.isEmpty()) {
return ex -> false;
}

// Use weak collection avoids blocking class unloading
Set<Class<? extends Throwable>> retryableExceptions = newSetFromMap(new WeakHashMap<>());
Arrays.stream(exceptionClassNames.split(","))
Arrays.stream(exceptionClassNames.get().split(","))
.map(String::trim)
.map(this::loadClass)
.forEach(retryableExceptions::add);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import java.util.{Map => JMap, NoSuchElementException}
import scala.collection.JavaConverters._

import org.opensearch.flint.core.FlintOptions
import org.opensearch.flint.core.http.FlintRetryOptions

import org.apache.spark.internal.config.ConfigReader
import org.apache.spark.sql.flint.config.FlintSparkConf._
Expand Down Expand Up @@ -111,6 +112,23 @@ object FlintSparkConf {
.doc("scroll duration in minutes")
.createWithDefault(String.valueOf(FlintOptions.DEFAULT_SCROLL_DURATION))

val MAX_RETRIES = FlintConfig(s"spark.datasource.flint.${FlintRetryOptions.MAX_RETRIES}")
.datasourceOption()
.doc("max retries on failed HTTP request, 0 means retry is disabled, default is 3")
.createWithDefault(String.valueOf(FlintRetryOptions.DEFAULT_MAX_RETRIES))

val RETRYABLE_HTTP_STATUS_CODES =
FlintConfig(s"spark.datasource.flint.${FlintRetryOptions.RETRYABLE_HTTP_STATUS_CODES}")
.datasourceOption()
.doc("retryable HTTP response status code list")
.createWithDefault(FlintRetryOptions.DEFAULT_RETRYABLE_HTTP_STATUS_CODES)

val RETRYABLE_EXCEPTION_CLASS_NAMES =
FlintConfig(s"spark.datasource.flint.${FlintRetryOptions.RETRYABLE_EXCEPTION_CLASS_NAMES}")
.datasourceOption()
.doc("retryable exception class name list, by default no retry on exception thrown")
.createOptional()

val OPTIMIZER_RULE_ENABLED = FlintConfig("spark.flint.optimizer.enabled")
.doc("Enable Flint optimizer rule for query rewrite with Flint index")
.createWithDefault("true")
Expand Down Expand Up @@ -166,6 +184,8 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable
SCROLL_DURATION,
SCHEME,
AUTH,
MAX_RETRIES,
RETRYABLE_HTTP_STATUS_CODES, // TODO: add optional exception class name option
REGION,
CUSTOM_AWS_CREDENTIALS_PROVIDER,
USERNAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,13 @@

package org.apache.spark.sql.flint.config

import java.util.Optional

import scala.collection.JavaConverters._

import org.opensearch.flint.core.http.FlintRetryOptions._
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper

import org.apache.spark.FlintSuite

class FlintSparkConfSuite extends FlintSuite {
Expand Down Expand Up @@ -36,6 +41,13 @@ class FlintSparkConfSuite extends FlintSuite {
assert(options.flintOptions().getPort == 9200)
}

test("test retry options default values") {
val retryOptions = FlintSparkConf().flintOptions().getRetryOptions
retryOptions.getMaxRetries shouldBe DEFAULT_MAX_RETRIES
retryOptions.getRetryableHttpStatusCodes shouldBe DEFAULT_RETRYABLE_HTTP_STATUS_CODES
retryOptions.getRetryableExceptionClassNames shouldBe Optional.empty
}

/**
* Delete index `indexNames` after calling `f`.
*/
Expand Down

0 comments on commit a527190

Please sign in to comment.