Skip to content

Commit

Permalink
Add jitter for remote download calls
Browse files Browse the repository at this point in the history
Signed-off-by: Arpit Bandejiya <[email protected]>
  • Loading branch information
Arpit-Bandejiya committed Sep 24, 2024
1 parent 1343367 commit 5735031
Show file tree
Hide file tree
Showing 44 changed files with 572 additions and 195 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Implement WithFieldName interface in ValuesSourceAggregationBuilder & FieldSortBuilder ([#15916](https://github.com/opensearch-project/OpenSearch/pull/15916))
- Add successfulSearchShardIndices in searchRequestContext ([#15967](https://github.com/opensearch-project/OpenSearch/pull/15967))
- Remove identity-related feature flagged code from the RestController ([#15430](https://github.com/opensearch-project/OpenSearch/pull/15430))
- [Remote Publication] Add jitter for remote publication download calls ([#15978](https://github.com/opensearch-project/OpenSearch/pull/15978))

### Dependencies
- Bump `com.azure:azure-identity` from 1.13.0 to 1.13.2 ([#15578](https://github.com/opensearch-project/OpenSearch/pull/15578))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import software.amazon.awssdk.core.client.config.ClientAsyncConfiguration;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption;
import software.amazon.awssdk.core.retry.RetryMode;
import software.amazon.awssdk.core.retry.RetryPolicy;
import software.amazon.awssdk.core.retry.backoff.BackoffStrategy;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
Expand Down Expand Up @@ -239,7 +240,9 @@ static ClientOverrideConfiguration buildOverrideConfiguration(final S3ClientSett
RetryPolicy.builder()
.numRetries(clientSettings.maxRetries)
.throttlingBackoffStrategy(
clientSettings.throttleRetries ? BackoffStrategy.defaultThrottlingStrategy() : BackoffStrategy.none()
clientSettings.throttleRetries
? BackoffStrategy.defaultThrottlingStrategy(RetryMode.STANDARD)
: BackoffStrategy.none()
)
.build()
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.core.retry.RetryMode;
import software.amazon.awssdk.core.retry.RetryPolicy;
import software.amazon.awssdk.core.retry.backoff.BackoffStrategy;
import software.amazon.awssdk.http.SystemPropertyTlsKeyManagersProvider;
Expand Down Expand Up @@ -330,6 +331,8 @@ static ClientOverrideConfiguration buildOverrideConfiguration(final S3ClientSett
);
if (!clientSettings.throttleRetries) {
retryPolicy.throttlingBackoffStrategy(BackoffStrategy.none());
} else {
retryPolicy.throttlingBackoffStrategy(BackoffStrategy.defaultThrottlingStrategy(RetryMode.STANDARD));
}
return clientOverrideConfiguration.retryPolicy(retryPolicy.build()).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.retry.RetryMode;
import software.amazon.awssdk.core.retry.backoff.BackoffStrategy;
import software.amazon.awssdk.http.apache.ProxyConfiguration;

Expand Down Expand Up @@ -364,7 +365,7 @@ private void launchAWSConfigurationTest(
if (expectedUseThrottleRetries) {
assertThat(
clientOverrideConfiguration.retryPolicy().get().throttlingBackoffStrategy(),
is(BackoffStrategy.defaultThrottlingStrategy())
is(BackoffStrategy.defaultThrottlingStrategy(RetryMode.STANDARD))
);
} else {
assertThat(clientOverrideConfiguration.retryPolicy().get().throttlingBackoffStrategy(), is(BackoffStrategy.none()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@

package org.opensearch.repositories.s3;

import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.retry.backoff.FullJitterBackoffStrategy;

import org.opensearch.cli.SuppressForbidden;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.common.settings.MockSecureSettings;
Expand All @@ -20,6 +23,9 @@
import java.util.Map;
import java.util.concurrent.Executors;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class S3AsyncServiceTests extends OpenSearchTestCase implements ConfigPathSupport {

@Override
Expand Down Expand Up @@ -92,4 +98,11 @@ public void testCachedClientsWithCredentialsAreReleased() {
final S3ClientSettings clientSettingsReloaded = s3AsyncService.settings(metadata1);
assertNotSame(clientSettings, clientSettingsReloaded);
}

public void testS3AsyncServiceDefaultRetryMechanism() {
S3ClientSettings s3ClientSettings = mock(S3ClientSettings.class);
when(s3ClientSettings.throttleRetries).thenReturn(true);
ClientOverrideConfiguration clientOverrideConfiguration = S3AsyncService.buildOverrideConfiguration(s3ClientSettings);
assertTrue(clientOverrideConfiguration.retryPolicy().get().backoffStrategy() instanceof FullJitterBackoffStrategy);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,18 @@

package org.opensearch.repositories.s3;

import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.retry.backoff.FullJitterBackoffStrategy;

import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.common.settings.MockSecureSettings;
import org.opensearch.common.settings.Settings;

import java.util.Map;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class S3ServiceTests extends AbstractS3RepositoryTestCase {
public void testCachedClientsAreReleased() {
final S3Service s3Service = new S3Service(configPath());
Expand Down Expand Up @@ -82,4 +88,11 @@ public void testCachedClientsWithCredentialsAreReleased() {
final S3ClientSettings clientSettingsReloaded = s3Service.settings(metadata1);
assertNotSame(clientSettings, clientSettingsReloaded);
}

public void testS3ServiceDefaultRetryMechanism() {
S3ClientSettings s3ClientSettings = mock(S3ClientSettings.class);
when(s3ClientSettings.throttleRetries).thenReturn(true);
ClientOverrideConfiguration clientOverrideConfiguration = S3AsyncService.buildOverrideConfiguration(s3ClientSettings);
assertTrue(clientOverrideConfiguration.retryPolicy().get().backoffStrategy() instanceof FullJitterBackoffStrategy);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
import org.opensearch.env.TestEnvironment;
import org.opensearch.gateway.GatewayMetaState;
import org.opensearch.gateway.PersistedClusterStateService;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.gateway.remote.RemoteClusterStateSettings;
import org.opensearch.indices.IndicesService;
import org.opensearch.node.Node.DiscoverySettings;
import org.opensearch.test.InternalTestCluster;
Expand Down Expand Up @@ -185,7 +185,7 @@ public void testBootstrapRemoteClusterEnabled() {
final Environment environment = TestEnvironment.newEnvironment(
Settings.builder()
.put(internalCluster().getDefaultSettings())
.put(RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true)
.put(RemoteClusterStateSettings.REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true)
.build()
);
expectThrows(() -> unsafeBootstrap(environment), UnsafeBootstrapClusterManagerCommand.REMOTE_CLUSTER_STATE_ENABLED_NODE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@
import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.RETAINED_MANIFESTS;
import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.SKIP_CLEANUP_STATE_CHANGES;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_PUBLICATION_SETTING_KEY;
import static org.opensearch.gateway.remote.RemoteClusterStateSettings.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateSettings.REMOTE_PUBLICATION_SETTING_KEY;
import static org.opensearch.gateway.remote.RemoteUploadStats.REMOTE_UPLOAD;
import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE;
import static org.opensearch.indices.IndicesService.CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateSettings.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER;
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.METADATA_FILE_PREFIX;
import static org.opensearch.gateway.remote.model.RemoteCoordinationMetadata.COORDINATION_METADATA;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;

import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_PUBLICATION_SETTING_KEY;
import static org.opensearch.gateway.remote.RemoteClusterStateSettings.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateSettings.REMOTE_PUBLICATION_SETTING_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.hamcrest.Matchers.is;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_PUBLICATION_SETTING_KEY;
import static org.opensearch.gateway.remote.RemoteClusterStateSettings.REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_MODE_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateSettings.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateSettings.REMOTE_PUBLICATION_SETTING_KEY;
import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE;
import static org.opensearch.indices.IndicesService.CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY;
Expand Down Expand Up @@ -68,8 +69,8 @@ protected Settings nodeSettings(int nodeOrdinal) {
.put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, REMOTE_ROUTING_TABLE_REPO)
.put(REMOTE_PUBLICATION_SETTING_KEY, true)
.put(
RemoteClusterStateService.REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_MODE_SETTING.getKey(),
RemoteClusterStateService.RemoteClusterStateValidationMode.FAILURE
REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_MODE_SETTING.getKey(),
RemoteClusterStateSettings.RemoteClusterStateValidationMode.FAILURE
)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,11 @@
import static org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest.Metric.DISCOVERY;
import static org.opensearch.cluster.metadata.Metadata.isGlobalStateEquals;
import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.DISCOVERY_NODES;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_PUBLICATION_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_PUBLICATION_SETTING_KEY;
import static org.opensearch.gateway.remote.RemoteClusterStateSettings.CLUSTER_REMOTE_STORE_STATE_PATH_PREFIX;
import static org.opensearch.gateway.remote.RemoteClusterStateSettings.REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_MODE_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateSettings.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateSettings.REMOTE_PUBLICATION_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateSettings.REMOTE_PUBLICATION_SETTING_KEY;
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER;
import static org.opensearch.gateway.remote.RemoteDownloadStats.CHECKSUM_VALIDATION_FAILED_COUNT;
import static org.opensearch.gateway.remote.model.RemoteClusterBlocks.CLUSTER_BLOCKS;
Expand Down Expand Up @@ -112,14 +114,11 @@ protected Settings nodeSettings(int nodeOrdinal) {
.put(routingTableRepoTypeAttributeKey, ReloadableFsRepository.TYPE)
.put(routingTableRepoSettingsAttributeKeyPrefix + "location", segmentRepoPath)
.put(
RemoteClusterStateService.REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_MODE_SETTING.getKey(),
RemoteClusterStateService.RemoteClusterStateValidationMode.FAILURE
REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_MODE_SETTING.getKey(),
RemoteClusterStateSettings.RemoteClusterStateValidationMode.FAILURE
)
.put(REMOTE_PUBLICATION_SETTING_KEY, isRemotePublicationEnabled)
.put(
RemoteClusterStateService.CLUSTER_REMOTE_STORE_STATE_PATH_PREFIX.getKey(),
hasRemoteStateCharPrefix ? REMOTE_STATE_PREFIX : ""
)
.put(CLUSTER_REMOTE_STORE_STATE_PATH_PREFIX.getKey(), hasRemoteStateCharPrefix ? REMOTE_STATE_PREFIX : "")
.put(
RemoteRoutingTableBlobStore.CLUSTER_REMOTE_STORE_ROUTING_TABLE_PATH_PREFIX.getKey(),
hasRemoteRoutingCharPrefix ? REMOTE_ROUTING_PREFIX : ""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateSettings.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
import static org.opensearch.repositories.fs.ReloadableFsRepository.REPOSITORIES_FAILRATE_SETTING;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_READ_ONLY_SETTING;
import static org.opensearch.cluster.metadata.Metadata.CLUSTER_READ_ONLY_BLOCK;
import static org.opensearch.cluster.metadata.Metadata.SETTING_READ_ONLY_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateSettings.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.encodeString;
import static org.opensearch.indices.ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE;
import static org.opensearch.repositories.blobstore.BlobStoreRepository.SYSTEM_REPOSITORY_SETTING;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import java.util.Arrays;
import java.util.concurrent.ExecutionException;

import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateSettings.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
import java.util.Set;

import static org.opensearch.cluster.coordination.Coordinator.ZEN1_BWC_TERM;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_PUBLICATION_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateSettings.REMOTE_PUBLICATION_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public Releasable startElectionScheduler(TimeValue gracePeriod, Runnable schedul
}

@SuppressForbidden(reason = "Argument to Math.abs() is definitely not Long.MIN_VALUE")
private static long nonNegative(long n) {
public static long nonNegative(long n) {
return n == Long.MIN_VALUE ? 0 : Math.abs(n);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
import java.util.Locale;
import java.util.Objects;

import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateSettings.REMOTE_CLUSTER_STATE_ENABLED_SETTING;

/**
* Tool to run an unsafe bootstrap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.compress.Compressor;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.gateway.remote.RemoteClusterStateSettings;
import org.opensearch.gateway.remote.RemoteClusterStateUtils;
import org.opensearch.gateway.remote.RemoteStateTransferException;
import org.opensearch.gateway.remote.model.RemoteRoutingTableBlobStore;
Expand All @@ -49,6 +50,7 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.opensearch.gateway.remote.RemoteClusterStateUtils.getRandomDownloadJitterDelay;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled;

/**
Expand All @@ -68,11 +70,13 @@ public class InternalRemoteRoutingTableService extends AbstractLifecycleComponen
private BlobStoreRepository blobStoreRepository;
private final ThreadPool threadPool;
private final String clusterName;
private RemoteClusterStateSettings remoteClusterStateSettings;

public InternalRemoteRoutingTableService(
Supplier<RepositoriesService> repositoriesService,
Settings settings,
ClusterSettings clusterSettings,
RemoteClusterStateSettings remoteClusterStateSettings,
ThreadPool threadpool,
String clusterName
) {
Expand All @@ -82,6 +86,7 @@ public InternalRemoteRoutingTableService(
this.threadPool = threadpool;
this.clusterName = clusterName;
this.clusterSettings = clusterSettings;
this.remoteClusterStateSettings = remoteClusterStateSettings;
}

public List<IndexRoutingTable> getIndicesRouting(RoutingTable routingTable) {
Expand Down Expand Up @@ -193,7 +198,11 @@ public void getAsyncIndexRoutingReadAction(

RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable(uploadedFilename, clusterUUID, compressor);

remoteIndexRoutingTableStore.readAsync(remoteIndexRoutingTable, actionListener);
remoteIndexRoutingTableStore.readAsyncWithDelay(
getRandomDownloadJitterDelay(remoteClusterStateSettings),
remoteIndexRoutingTable,
actionListener
);
}

@Override
Expand All @@ -208,7 +217,12 @@ public void getAsyncIndexRoutingTableDiffReadAction(
);

RemoteRoutingTableDiff remoteRoutingTableDiff = new RemoteRoutingTableDiff(uploadedFilename, clusterUUID, compressor);
remoteRoutingTableDiffStore.readAsync(remoteRoutingTableDiff, actionListener);

remoteRoutingTableDiffStore.readAsyncWithDelay(
getRandomDownloadJitterDelay(remoteClusterStateSettings),
remoteRoutingTableDiff,
actionListener
);
}

@Override
Expand Down
Loading

0 comments on commit 5735031

Please sign in to comment.