Skip to content

Commit

Permalink
Add exception class name to Spark conf
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Nov 22, 2023
1 parent a527190 commit 948aea6
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,11 @@ public <T> RetryPolicy<T> getRetryPolicy() {
// Backoff strategy config (can be configurable as needed in future)
.withBackoff(1, 30, SECONDS)
.withJitter(Duration.ofMillis(100))
// Failure handling config configured in Flint options
// Failure handling config from Flint options
.withMaxRetries(getMaxRetries())
.handleIf(getRetryableExceptionHandler())
.handleResultIf(getRetryableResultHandler())
// Logging config
// Logging listener
.onFailedAttempt(ex ->
LOG.log(SEVERE, "Attempt to execute request failed", ex.getLastException()))
.onRetry(ex ->
Expand Down Expand Up @@ -159,10 +159,8 @@ private Class<? extends Throwable> loadClass(String className) {
public String toString() {
return "FlintRetryOptions{" +
"maxRetries=" + getMaxRetries() +
", retryableStatusCodes=" +
options.getOrDefault(RETRYABLE_HTTP_STATUS_CODES, DEFAULT_RETRYABLE_HTTP_STATUS_CODES) +
", retryableExceptionClassNames=" +
options.getOrDefault(RETRYABLE_EXCEPTION_CLASS_NAMES, "") +
", retryableStatusCodes=" + getRetryableHttpStatusCodes() +
", retryableExceptionClassNames=" + getRetryableExceptionClassNames() +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ package org.apache.spark.sql.flint.config
import java.util
import java.util.{Map => JMap, NoSuchElementException}

import scala.collection.JavaConverters._
import scala.collection.JavaConverters.mapAsJavaMapConverter

import org.opensearch.flint.core.FlintOptions
import org.opensearch.flint.core.http.FlintRetryOptions
Expand Down Expand Up @@ -175,23 +175,31 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable
* Helper class, create {@link FlintOptions}.
*/
def flintOptions(): FlintOptions = {
new FlintOptions(
Seq(
HOST_ENDPOINT,
HOST_PORT,
REFRESH_POLICY,
SCROLL_SIZE,
SCROLL_DURATION,
SCHEME,
AUTH,
MAX_RETRIES,
RETRYABLE_HTTP_STATUS_CODES, // TODO: add optional exception class name option
REGION,
CUSTOM_AWS_CREDENTIALS_PROVIDER,
USERNAME,
PASSWORD)
.map(conf => (conf.optionKey, conf.readFrom(reader)))
.toMap
.asJava)
val optionsWithDefault = Seq(
HOST_ENDPOINT,
HOST_PORT,
REFRESH_POLICY,
SCROLL_SIZE,
SCROLL_DURATION,
SCHEME,
AUTH,
MAX_RETRIES,
RETRYABLE_HTTP_STATUS_CODES,
REGION,
CUSTOM_AWS_CREDENTIALS_PROVIDER,
USERNAME,
PASSWORD)
.map(conf => (conf.optionKey, conf.readFrom(reader)))
.toMap

val optionsWithoutDefault = Seq(RETRYABLE_EXCEPTION_CLASS_NAMES)
.map(conf => (conf.optionKey, conf.readFrom(reader)))
.flatMap {
case (_, None) => None
case (key, value) => Some(key, value.get)
}
.toMap

new FlintOptions((optionsWithDefault ++ optionsWithoutDefault).asJava)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,20 @@ class FlintSparkConfSuite extends FlintSuite {
retryOptions.getRetryableExceptionClassNames shouldBe Optional.empty
}

test("test specified retry options") {
val retryOptions = FlintSparkConf(
Map(
"retry.max_retries" -> "5",
"retry.http_status_codes" -> "429,502,503,504",
"retry.exception_class_names" -> "java.net.ConnectException").asJava)
.flintOptions()
.getRetryOptions

retryOptions.getMaxRetries shouldBe 5
retryOptions.getRetryableHttpStatusCodes shouldBe "429,502,503,504"
retryOptions.getRetryableExceptionClassNames.get() shouldBe "java.net.ConnectException"
}

/**
* Delete index `indexNames` after calling `f`.
*/
Expand Down

0 comments on commit 948aea6

Please sign in to comment.