Skip to content

Commit

Permalink
Improve Flint Index Creation Reliability by Adjusting Socket Timeout
Browse files Browse the repository at this point in the history
This PR addresses the issue where creating a Flint index could fail due to a socket timeout. We have increased the default socket timeout from 30 (as defined in the OpenSearch RESTful Java client) to 60 seconds. Additionally, we've made the socket timeout value configurable to ensure greater flexibility and adaptability to various network conditions.

Test enhancements include:
- E2E testing with simulated socket timeout to validate the robustness of index creation under constrained network scenarios.

Signed-off-by: Kaituo Li <[email protected]>
  • Loading branch information
kaituo committed Nov 28, 2023
1 parent 78193f5 commit 5dd9fac
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 1 deletion.
1 change: 1 addition & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,7 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i
- `spark.flint.optimizer.enabled`: default is true.
- `spark.flint.index.hybridscan.enabled`: default is false.
- `spark.flint.index.checkpoint.mandatory`: default is true.
- `spark.datasource.flint.socket_timeout_millis`: default value is 60000.

#### Data Type Mapping

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.flint.core;

import java.security.KeyManagementException;
import java.util.List;

import org.opensearch.client.RestHighLevelClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ public class FlintOptions implements Serializable {
*/
public static final String DEFAULT_REFRESH_POLICY = "wait_for";

public static final String SOCKET_TIMEOUT_MILLIS = "socket_timeout_millis";

public static final int DEFAULT_SOCKET_TIMEOUT_MILLIS = 60000;

public FlintOptions(Map<String, String> options) {
this.options = options;
this.retryOptions = new FlintRetryOptions(options);
Expand Down Expand Up @@ -123,4 +127,8 @@ public String getUsername() {
public String getPassword() {
return options.getOrDefault(PASSWORD, "flint");
}

public int getSocketTimeoutMillis() {
return Integer.parseInt(options.getOrDefault(SOCKET_TIMEOUT_MILLIS, String.valueOf(DEFAULT_SOCKET_TIMEOUT_MILLIS)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,10 @@ public RestHighLevelClient createClient() {
restClientBuilder.setHttpClientConfigCallback(delegate ->
RetryableHttpAsyncClient.builder(delegate, options));
}

final TimeoutConfigurator callback = new TimeoutConfigurator(options);
restClientBuilder.setRequestConfigCallback(callback);

return new RestHighLevelClient(restClientBuilder);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.storage;

import org.apache.http.client.config.RequestConfig;
import org.opensearch.client.RestClientBuilder;
import org.opensearch.flint.core.FlintOptions;

/**
* allows override default socket timeout in RestClientBuilder.DEFAULT_SOCKET_TIMEOUT_MILLIS
*/
public class TimeoutConfigurator implements RestClientBuilder.RequestConfigCallback {

private final FlintOptions options;

public TimeoutConfigurator(FlintOptions options) {
this.options = options;
}

@Override
public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) {
// Set the socket timeout in milliseconds
return requestConfigBuilder.setSocketTimeout(options.getSocketTimeoutMillis());
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,13 @@ object FlintSparkConf {
val CHECKPOINT_MANDATORY = FlintConfig("spark.flint.index.checkpoint.mandatory")
.doc("Checkpoint location for incremental refresh index will be mandatory if enabled")
.createWithDefault("true")

val SOCKET_TIMEOUT_MILLIS =
FlintConfig(s"spark.datasource.flint.${FlintOptions.SOCKET_TIMEOUT_MILLIS}")
.datasourceOption()
.doc("socket duration in milliseconds")
.createWithDefault(String.valueOf(FlintOptions.DEFAULT_SOCKET_TIMEOUT_MILLIS))

}

/**
Expand Down Expand Up @@ -188,7 +195,8 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable
REGION,
CUSTOM_AWS_CREDENTIALS_PROVIDER,
USERNAME,
PASSWORD)
PASSWORD,
SOCKET_TIMEOUT_MILLIS)
.map(conf => (conf.optionKey, conf.readFrom(reader)))
.toMap

Expand Down

0 comments on commit 5dd9fac

Please sign in to comment.