Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Flint Index Creation Reliability by Adjusting Socket Timeout #177

Merged
merged 2 commits into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,7 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i
- `spark.flint.optimizer.enabled`: default is true.
- `spark.flint.index.hybridscan.enabled`: default is false.
- `spark.flint.index.checkpoint.mandatory`: default is true.
- `spark.datasource.flint.socket_timeout_millis`: default value is 60000.

#### Data Type Mapping

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ public class FlintOptions implements Serializable {
*/
public static final String DEFAULT_REFRESH_POLICY = "wait_for";

public static final String SOCKET_TIMEOUT_MILLIS = "socket_timeout_millis";

public static final int DEFAULT_SOCKET_TIMEOUT_MILLIS = 60000;

public FlintOptions(Map<String, String> options) {
this.options = options;
this.retryOptions = new FlintRetryOptions(options);
Expand Down Expand Up @@ -123,4 +127,8 @@ public String getUsername() {
public String getPassword() {
return options.getOrDefault(PASSWORD, "flint");
}

public int getSocketTimeoutMillis() {
return Integer.parseInt(options.getOrDefault(SOCKET_TIMEOUT_MILLIS, String.valueOf(DEFAULT_SOCKET_TIMEOUT_MILLIS)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,10 @@ public RestHighLevelClient createClient() {
restClientBuilder.setHttpClientConfigCallback(delegate ->
RetryableHttpAsyncClient.builder(delegate, options));
}

final RequestConfigurator callback = new RequestConfigurator(options);
restClientBuilder.setRequestConfigCallback(callback);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • does it impact default requestConfigBuilder? e.g. DEFAULT_CONNECT_TIMEOUT_MILLIS?
RequestConfig.Builder requestConfigBuilder = RequestConfig.custom()
    .setConnectTimeout(DEFAULT_CONNECT_TIMEOUT_MILLIS)
    .setSocketTimeout(DEFAULT_SOCKET_TIMEOUT_MILLIS);
if (requestConfigCallback != null) {
    requestConfigBuilder = requestConfigCallback.customizeRequestConfig(requestConfigBuilder);
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it won't as customizeRequestConfig returns the same builder.

public class RequestConfigurator implements RestClientBuilder.RequestConfigCallback {
...
@Override
    public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) {
        // Set the socket timeout in milliseconds
        return requestConfigBuilder.setSocketTimeout(options.getSocketTimeoutMillis());
    }

public class RequestConfig implements Cloneable {
...
        public Builder setSocketTimeout(final int socketTimeout) {
            this.socketTimeout = socketTimeout;
            return this;
        }
...


return new RestHighLevelClient(restClientBuilder);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.storage;

import org.apache.http.client.config.RequestConfig;
import org.opensearch.client.RestClientBuilder;
import org.opensearch.flint.core.FlintOptions;

/**
* allows override default socket timeout in RestClientBuilder.DEFAULT_SOCKET_TIMEOUT_MILLIS
*/
public class RequestConfigurator implements RestClientBuilder.RequestConfigCallback {

private final FlintOptions options;

public RequestConfigurator(FlintOptions options) {
this.options = options;
}

@Override
public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) {
// Set the socket timeout in milliseconds
return requestConfigBuilder.setSocketTimeout(options.getSocketTimeoutMillis());
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,13 @@ object FlintSparkConf {
val CHECKPOINT_MANDATORY = FlintConfig("spark.flint.index.checkpoint.mandatory")
.doc("Checkpoint location for incremental refresh index will be mandatory if enabled")
.createWithDefault("true")

val SOCKET_TIMEOUT_MILLIS =
FlintConfig(s"spark.datasource.flint.${FlintOptions.SOCKET_TIMEOUT_MILLIS}")
.datasourceOption()
.doc("socket duration in milliseconds")
.createWithDefault(String.valueOf(FlintOptions.DEFAULT_SOCKET_TIMEOUT_MILLIS))

}

/**
Expand Down Expand Up @@ -188,7 +195,8 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable
REGION,
CUSTOM_AWS_CREDENTIALS_PROVIDER,
USERNAME,
PASSWORD)
PASSWORD,
SOCKET_TIMEOUT_MILLIS)
.map(conf => (conf.optionKey, conf.readFrom(reader)))
.toMap

Expand Down
Loading