From 5dd9fac3f9d7ca67aae85b16814494861c3f59ec Mon Sep 17 00:00:00 2001 From: Kaituo Li Date: Tue, 28 Nov 2023 01:25:03 -0800 Subject: [PATCH 1/2] Improve Flint Index Creation Reliability by Adjusting Socket Timeout This PR addresses the issue where creating a Flint index could fail due to a socket timeout. We have increased the default socket timeout from 30 (as defined in the OpenSearch RESTful Java client) to 60 seconds. Additionally, we've made the socket timeout value configurable to ensure greater flexibility and adaptability to various network conditions. Test enhancements include: - E2E testing with simulated socket timeout to validate the robustness of index creation under constrained network scenarios. Signed-off-by: Kaituo Li --- docs/index.md | 1 + .../opensearch/flint/core/FlintClient.java | 1 + .../opensearch/flint/core/FlintOptions.java | 8 +++++ .../core/storage/FlintOpenSearchClient.java | 4 +++ .../core/storage/TimeoutConfigurator.java | 29 +++++++++++++++++++ .../sql/flint/config/FlintSparkConf.scala | 10 ++++++- 6 files changed, 52 insertions(+), 1 deletion(-) create mode 100644 flint-core/src/main/scala/org/opensearch/flint/core/storage/TimeoutConfigurator.java diff --git a/docs/index.md b/docs/index.md index 03164e942..ea6778f39 100644 --- a/docs/index.md +++ b/docs/index.md @@ -364,6 +364,7 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i - `spark.flint.optimizer.enabled`: default is true. - `spark.flint.index.hybridscan.enabled`: default is false. - `spark.flint.index.checkpoint.mandatory`: default is true. +- `spark.datasource.flint.socket_timeout_millis`: default value is 60000. #### Data Type Mapping diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java index 6cdf5187d..6dc31838a 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java @@ -5,6 +5,7 @@ package org.opensearch.flint.core; +import java.security.KeyManagementException; import java.util.List; import org.opensearch.client.RestHighLevelClient; diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java b/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java index 8ce3054d9..c1c5491ed 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java @@ -73,6 +73,10 @@ public class FlintOptions implements Serializable { */ public static final String DEFAULT_REFRESH_POLICY = "wait_for"; + public static final String SOCKET_TIMEOUT_MILLIS = "socket_timeout_millis"; + + public static final int DEFAULT_SOCKET_TIMEOUT_MILLIS = 60000; + public FlintOptions(Map options) { this.options = options; this.retryOptions = new FlintRetryOptions(options); @@ -123,4 +127,8 @@ public String getUsername() { public String getPassword() { return options.getOrDefault(PASSWORD, "flint"); } + + public int getSocketTimeoutMillis() { + return Integer.parseInt(options.getOrDefault(SOCKET_TIMEOUT_MILLIS, String.valueOf(DEFAULT_SOCKET_TIMEOUT_MILLIS))); + } } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java index e3ac49607..59e030d8e 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java @@ -271,6 +271,10 @@ public RestHighLevelClient createClient() { restClientBuilder.setHttpClientConfigCallback(delegate -> RetryableHttpAsyncClient.builder(delegate, options)); } + + final TimeoutConfigurator callback = new TimeoutConfigurator(options); + restClientBuilder.setRequestConfigCallback(callback); + return new RestHighLevelClient(restClientBuilder); } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/TimeoutConfigurator.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/TimeoutConfigurator.java new file mode 100644 index 000000000..79a3fc77b --- /dev/null +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/TimeoutConfigurator.java @@ -0,0 +1,29 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.storage; + +import org.apache.http.client.config.RequestConfig; +import org.opensearch.client.RestClientBuilder; +import org.opensearch.flint.core.FlintOptions; + +/** + * allows override default socket timeout in RestClientBuilder.DEFAULT_SOCKET_TIMEOUT_MILLIS + */ +public class TimeoutConfigurator implements RestClientBuilder.RequestConfigCallback { + + private final FlintOptions options; + + public TimeoutConfigurator(FlintOptions options) { + this.options = options; + } + + @Override + public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) { + // Set the socket timeout in milliseconds + return requestConfigBuilder.setSocketTimeout(options.getSocketTimeoutMillis()); + } +} + diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala index c220b4b01..fd998d46d 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala @@ -140,6 +140,13 @@ object FlintSparkConf { val CHECKPOINT_MANDATORY = FlintConfig("spark.flint.index.checkpoint.mandatory") .doc("Checkpoint location for incremental refresh index will be mandatory if enabled") .createWithDefault("true") + + val SOCKET_TIMEOUT_MILLIS = + FlintConfig(s"spark.datasource.flint.${FlintOptions.SOCKET_TIMEOUT_MILLIS}") + .datasourceOption() + .doc("socket duration in milliseconds") + .createWithDefault(String.valueOf(FlintOptions.DEFAULT_SOCKET_TIMEOUT_MILLIS)) + } /** @@ -188,7 +195,8 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable REGION, CUSTOM_AWS_CREDENTIALS_PROVIDER, USERNAME, - PASSWORD) + PASSWORD, + SOCKET_TIMEOUT_MILLIS) .map(conf => (conf.optionKey, conf.readFrom(reader))) .toMap From 0a32e7aaaa169bef96eeebf48902ec91b1a3093c Mon Sep 17 00:00:00 2001 From: Kaituo Li Date: Tue, 28 Nov 2023 10:32:41 -0800 Subject: [PATCH 2/2] rename class and remove unused code Signed-off-by: Kaituo Li --- .../src/main/scala/org/opensearch/flint/core/FlintClient.java | 1 - .../opensearch/flint/core/storage/FlintOpenSearchClient.java | 2 +- .../{TimeoutConfigurator.java => RequestConfigurator.java} | 4 ++-- 3 files changed, 3 insertions(+), 4 deletions(-) rename flint-core/src/main/scala/org/opensearch/flint/core/storage/{TimeoutConfigurator.java => RequestConfigurator.java} (86%) diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java index 6dc31838a..6cdf5187d 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java @@ -5,7 +5,6 @@ package org.opensearch.flint.core; -import java.security.KeyManagementException; import java.util.List; import org.opensearch.client.RestHighLevelClient; diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java index 59e030d8e..22badfbf9 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java @@ -272,7 +272,7 @@ public RestHighLevelClient createClient() { RetryableHttpAsyncClient.builder(delegate, options)); } - final TimeoutConfigurator callback = new TimeoutConfigurator(options); + final RequestConfigurator callback = new RequestConfigurator(options); restClientBuilder.setRequestConfigCallback(callback); return new RestHighLevelClient(restClientBuilder); diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/TimeoutConfigurator.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/RequestConfigurator.java similarity index 86% rename from flint-core/src/main/scala/org/opensearch/flint/core/storage/TimeoutConfigurator.java rename to flint-core/src/main/scala/org/opensearch/flint/core/storage/RequestConfigurator.java index 79a3fc77b..b704773d3 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/TimeoutConfigurator.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/RequestConfigurator.java @@ -12,11 +12,11 @@ /** * allows override default socket timeout in RestClientBuilder.DEFAULT_SOCKET_TIMEOUT_MILLIS */ -public class TimeoutConfigurator implements RestClientBuilder.RequestConfigCallback { +public class RequestConfigurator implements RestClientBuilder.RequestConfigCallback { private final FlintOptions options; - public TimeoutConfigurator(FlintOptions options) { + public RequestConfigurator(FlintOptions options) { this.options = options; }