From 5735031fa36096c1709bc1b6aa9f3993c4c97c5d Mon Sep 17 00:00:00 2001 From: Arpit Bandejiya Date: Wed, 18 Sep 2024 16:56:44 +0530 Subject: [PATCH] Add jitter for remote download calls Signed-off-by: Arpit Bandejiya --- CHANGELOG.md | 1 + .../repositories/s3/S3AsyncService.java | 5 +- .../opensearch/repositories/s3/S3Service.java | 3 + .../s3/AwsS3ServiceImplTests.java | 3 +- .../repositories/s3/S3AsyncServiceTests.java | 13 ++ .../repositories/s3/S3ServiceTests.java | 13 ++ .../UnsafeBootstrapAndDetachCommandIT.java | 4 +- .../RemoteClusterStateCleanupManagerIT.java | 4 +- .../remote/RemoteClusterStateServiceIT.java | 2 +- .../RemoteClusterStateTermVersionIT.java | 4 +- .../remote/RemoteRoutingTableServiceIT.java | 9 +- .../remote/RemoteStatePublicationIT.java | 17 +- .../MigrationBaseTestCase.java | 2 +- .../RemoteStoreClusterStateRestoreIT.java | 2 +- .../RemoteStoreUploadIndexPathIT.java | 2 +- .../coordination/CoordinationState.java | 2 +- .../ElectionSchedulerFactory.java | 2 +- .../UnsafeBootstrapClusterManagerCommand.java | 2 +- .../InternalRemoteRoutingTableService.java | 18 +- .../RemoteRoutingTableServiceFactory.java | 11 +- .../AbstractRemoteWritableEntityManager.java | 24 ++- .../remote/RemoteWritableEntityStore.java | 2 + .../RemoteWriteableEntityBlobStore.java | 17 +- .../common/settings/ClusterSettings.java | 18 +- .../gateway/remote/DefaultRandomObject.java | 22 +++ .../RemoteClusterStateAttributesManager.java | 2 + .../remote/RemoteClusterStateService.java | 148 ++++------------ .../remote/RemoteClusterStateSettings.java | 167 ++++++++++++++++++ .../remote/RemoteClusterStateUtils.java | 16 ++ .../remote/RemoteGlobalMetadataManager.java | 2 + .../remote/RemoteIndexMetadataManager.java | 2 + .../remotestore/RemoteStoreNodeAttribute.java | 6 +- .../coordination/CoordinationStateTests.java | 7 +- ...RemoteRoutingTableServiceFactoryTests.java | 5 +- .../RemoteRoutingTableServiceTests.java | 14 +- .../GatewayMetaStatePersistedStateTests.java | 5 +- ...oteClusterStateAttributesManagerTests.java | 48 +++++ ...RemoteClusterStateCleanupManagerTests.java | 2 +- .../RemoteClusterStateServiceTests.java | 57 +++--- .../RemoteGlobalMetadataManagerTests.java | 33 ++++ .../RemoteIndexMetadataManagerTests.java | 41 +++++ .../remote/RemoteIndexPathUploaderTests.java | 4 +- .../BlobStoreRepositoryRemoteIndexTests.java | 4 +- ...StaticSettingsOpenSearchIntegTestCase.java | 2 +- 44 files changed, 572 insertions(+), 195 deletions(-) create mode 100644 server/src/main/java/org/opensearch/gateway/remote/DefaultRandomObject.java create mode 100644 server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateSettings.java diff --git a/CHANGELOG.md b/CHANGELOG.md index ae0b0e0e7a235..17e62f10283b8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Implement WithFieldName interface in ValuesSourceAggregationBuilder & FieldSortBuilder ([#15916](https://github.com/opensearch-project/OpenSearch/pull/15916)) - Add successfulSearchShardIndices in searchRequestContext ([#15967](https://github.com/opensearch-project/OpenSearch/pull/15967)) - Remove identity-related feature flagged code from the RestController ([#15430](https://github.com/opensearch-project/OpenSearch/pull/15430)) +- [Remote Publication] Add jitter for remote publication download calls ([#15978](https://github.com/opensearch-project/OpenSearch/pull/15978)) ### Dependencies - Bump `com.azure:azure-identity` from 1.13.0 to 1.13.2 ([#15578](https://github.com/opensearch-project/OpenSearch/pull/15578)) diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3AsyncService.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3AsyncService.java index d691cad9c9d03..8bbef168de89c 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3AsyncService.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3AsyncService.java @@ -18,6 +18,7 @@ import software.amazon.awssdk.core.client.config.ClientAsyncConfiguration; import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; import software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption; +import software.amazon.awssdk.core.retry.RetryMode; import software.amazon.awssdk.core.retry.RetryPolicy; import software.amazon.awssdk.core.retry.backoff.BackoffStrategy; import software.amazon.awssdk.http.async.SdkAsyncHttpClient; @@ -239,7 +240,9 @@ static ClientOverrideConfiguration buildOverrideConfiguration(final S3ClientSett RetryPolicy.builder() .numRetries(clientSettings.maxRetries) .throttlingBackoffStrategy( - clientSettings.throttleRetries ? BackoffStrategy.defaultThrottlingStrategy() : BackoffStrategy.none() + clientSettings.throttleRetries + ? BackoffStrategy.defaultThrottlingStrategy(RetryMode.STANDARD) + : BackoffStrategy.none() ) .build() ) diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Service.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Service.java index fe81da31432f4..3d5e121778ba9 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Service.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Service.java @@ -42,6 +42,7 @@ import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption; import software.amazon.awssdk.core.exception.SdkException; +import software.amazon.awssdk.core.retry.RetryMode; import software.amazon.awssdk.core.retry.RetryPolicy; import software.amazon.awssdk.core.retry.backoff.BackoffStrategy; import software.amazon.awssdk.http.SystemPropertyTlsKeyManagersProvider; @@ -330,6 +331,8 @@ static ClientOverrideConfiguration buildOverrideConfiguration(final S3ClientSett ); if (!clientSettings.throttleRetries) { retryPolicy.throttlingBackoffStrategy(BackoffStrategy.none()); + } else { + retryPolicy.throttlingBackoffStrategy(BackoffStrategy.defaultThrottlingStrategy(RetryMode.STANDARD)); } return clientOverrideConfiguration.retryPolicy(retryPolicy.build()).build(); } diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/AwsS3ServiceImplTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/AwsS3ServiceImplTests.java index b80b857644f2a..e7312157d7a33 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/AwsS3ServiceImplTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/AwsS3ServiceImplTests.java @@ -35,6 +35,7 @@ import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.core.retry.RetryMode; import software.amazon.awssdk.core.retry.backoff.BackoffStrategy; import software.amazon.awssdk.http.apache.ProxyConfiguration; @@ -364,7 +365,7 @@ private void launchAWSConfigurationTest( if (expectedUseThrottleRetries) { assertThat( clientOverrideConfiguration.retryPolicy().get().throttlingBackoffStrategy(), - is(BackoffStrategy.defaultThrottlingStrategy()) + is(BackoffStrategy.defaultThrottlingStrategy(RetryMode.STANDARD)) ); } else { assertThat(clientOverrideConfiguration.retryPolicy().get().throttlingBackoffStrategy(), is(BackoffStrategy.none())); diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3AsyncServiceTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3AsyncServiceTests.java index de9ad46bb222d..b14c30427ad00 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3AsyncServiceTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3AsyncServiceTests.java @@ -8,6 +8,9 @@ package org.opensearch.repositories.s3; +import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.core.retry.backoff.FullJitterBackoffStrategy; + import org.opensearch.cli.SuppressForbidden; import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.common.settings.MockSecureSettings; @@ -20,6 +23,9 @@ import java.util.Map; import java.util.concurrent.Executors; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + public class S3AsyncServiceTests extends OpenSearchTestCase implements ConfigPathSupport { @Override @@ -92,4 +98,11 @@ public void testCachedClientsWithCredentialsAreReleased() { final S3ClientSettings clientSettingsReloaded = s3AsyncService.settings(metadata1); assertNotSame(clientSettings, clientSettingsReloaded); } + + public void testS3AsyncServiceDefaultRetryMechanism() { + S3ClientSettings s3ClientSettings = mock(S3ClientSettings.class); + when(s3ClientSettings.throttleRetries).thenReturn(true); + ClientOverrideConfiguration clientOverrideConfiguration = S3AsyncService.buildOverrideConfiguration(s3ClientSettings); + assertTrue(clientOverrideConfiguration.retryPolicy().get().backoffStrategy() instanceof FullJitterBackoffStrategy); + } } diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3ServiceTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3ServiceTests.java index 400905eec8b1c..806f8078b01e0 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3ServiceTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3ServiceTests.java @@ -31,12 +31,18 @@ package org.opensearch.repositories.s3; +import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.core.retry.backoff.FullJitterBackoffStrategy; + import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.common.settings.MockSecureSettings; import org.opensearch.common.settings.Settings; import java.util.Map; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + public class S3ServiceTests extends AbstractS3RepositoryTestCase { public void testCachedClientsAreReleased() { final S3Service s3Service = new S3Service(configPath()); @@ -82,4 +88,11 @@ public void testCachedClientsWithCredentialsAreReleased() { final S3ClientSettings clientSettingsReloaded = s3Service.settings(metadata1); assertNotSame(clientSettings, clientSettingsReloaded); } + + public void testS3ServiceDefaultRetryMechanism() { + S3ClientSettings s3ClientSettings = mock(S3ClientSettings.class); + when(s3ClientSettings.throttleRetries).thenReturn(true); + ClientOverrideConfiguration clientOverrideConfiguration = S3AsyncService.buildOverrideConfiguration(s3ClientSettings); + assertTrue(clientOverrideConfiguration.retryPolicy().get().backoffStrategy() instanceof FullJitterBackoffStrategy); + } } diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/UnsafeBootstrapAndDetachCommandIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/UnsafeBootstrapAndDetachCommandIT.java index b30eb1f3e3b39..b2e502f582580 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/UnsafeBootstrapAndDetachCommandIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/UnsafeBootstrapAndDetachCommandIT.java @@ -44,7 +44,7 @@ import org.opensearch.env.TestEnvironment; import org.opensearch.gateway.GatewayMetaState; import org.opensearch.gateway.PersistedClusterStateService; -import org.opensearch.gateway.remote.RemoteClusterStateService; +import org.opensearch.gateway.remote.RemoteClusterStateSettings; import org.opensearch.indices.IndicesService; import org.opensearch.node.Node.DiscoverySettings; import org.opensearch.test.InternalTestCluster; @@ -185,7 +185,7 @@ public void testBootstrapRemoteClusterEnabled() { final Environment environment = TestEnvironment.newEnvironment( Settings.builder() .put(internalCluster().getDefaultSettings()) - .put(RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true) + .put(RemoteClusterStateSettings.REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true) .build() ); expectThrows(() -> unsafeBootstrap(environment), UnsafeBootstrapClusterManagerCommand.REMOTE_CLUSTER_STATE_ENABLED_NODE); diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerIT.java index cf17a58d937de..cc92ac52fb9f3 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerIT.java @@ -39,8 +39,8 @@ import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING; import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.RETAINED_MANIFESTS; import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.SKIP_CLEANUP_STATE_CHANGES; -import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING; -import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_PUBLICATION_SETTING_KEY; +import static org.opensearch.gateway.remote.RemoteClusterStateSettings.REMOTE_CLUSTER_STATE_ENABLED_SETTING; +import static org.opensearch.gateway.remote.RemoteClusterStateSettings.REMOTE_PUBLICATION_SETTING_KEY; import static org.opensearch.gateway.remote.RemoteUploadStats.REMOTE_UPLOAD; import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE; import static org.opensearch.indices.IndicesService.CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING; diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java index f6c7355ea06f6..d46f7afaa83eb 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java @@ -26,7 +26,7 @@ import java.util.function.Function; import java.util.stream.Collectors; -import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING; +import static org.opensearch.gateway.remote.RemoteClusterStateSettings.REMOTE_CLUSTER_STATE_ENABLED_SETTING; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.METADATA_FILE_PREFIX; import static org.opensearch.gateway.remote.model.RemoteCoordinationMetadata.COORDINATION_METADATA; diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateTermVersionIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateTermVersionIT.java index 256c2ef44b078..749808f73a5ed 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateTermVersionIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateTermVersionIT.java @@ -39,8 +39,8 @@ import java.util.Optional; import java.util.concurrent.atomic.AtomicInteger; -import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING; -import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_PUBLICATION_SETTING_KEY; +import static org.opensearch.gateway.remote.RemoteClusterStateSettings.REMOTE_CLUSTER_STATE_ENABLED_SETTING; +import static org.opensearch.gateway.remote.RemoteClusterStateSettings.REMOTE_PUBLICATION_SETTING_KEY; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.hamcrest.Matchers.is; diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteRoutingTableServiceIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteRoutingTableServiceIT.java index d143cbd7c3450..8e1f54819b905 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteRoutingTableServiceIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteRoutingTableServiceIT.java @@ -37,8 +37,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING; -import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_PUBLICATION_SETTING_KEY; +import static org.opensearch.gateway.remote.RemoteClusterStateSettings.REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_MODE_SETTING; +import static org.opensearch.gateway.remote.RemoteClusterStateSettings.REMOTE_CLUSTER_STATE_ENABLED_SETTING; +import static org.opensearch.gateway.remote.RemoteClusterStateSettings.REMOTE_PUBLICATION_SETTING_KEY; import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE; import static org.opensearch.indices.IndicesService.CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY; @@ -68,8 +69,8 @@ protected Settings nodeSettings(int nodeOrdinal) { .put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, REMOTE_ROUTING_TABLE_REPO) .put(REMOTE_PUBLICATION_SETTING_KEY, true) .put( - RemoteClusterStateService.REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_MODE_SETTING.getKey(), - RemoteClusterStateService.RemoteClusterStateValidationMode.FAILURE + REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_MODE_SETTING.getKey(), + RemoteClusterStateSettings.RemoteClusterStateValidationMode.FAILURE ) .build(); } diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java index ffb9352e8ba47..6bbb699bb1f34 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java @@ -52,9 +52,11 @@ import static org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest.Metric.DISCOVERY; import static org.opensearch.cluster.metadata.Metadata.isGlobalStateEquals; import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.DISCOVERY_NODES; -import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING; -import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_PUBLICATION_SETTING; -import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_PUBLICATION_SETTING_KEY; +import static org.opensearch.gateway.remote.RemoteClusterStateSettings.CLUSTER_REMOTE_STORE_STATE_PATH_PREFIX; +import static org.opensearch.gateway.remote.RemoteClusterStateSettings.REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_MODE_SETTING; +import static org.opensearch.gateway.remote.RemoteClusterStateSettings.REMOTE_CLUSTER_STATE_ENABLED_SETTING; +import static org.opensearch.gateway.remote.RemoteClusterStateSettings.REMOTE_PUBLICATION_SETTING; +import static org.opensearch.gateway.remote.RemoteClusterStateSettings.REMOTE_PUBLICATION_SETTING_KEY; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; import static org.opensearch.gateway.remote.RemoteDownloadStats.CHECKSUM_VALIDATION_FAILED_COUNT; import static org.opensearch.gateway.remote.model.RemoteClusterBlocks.CLUSTER_BLOCKS; @@ -112,14 +114,11 @@ protected Settings nodeSettings(int nodeOrdinal) { .put(routingTableRepoTypeAttributeKey, ReloadableFsRepository.TYPE) .put(routingTableRepoSettingsAttributeKeyPrefix + "location", segmentRepoPath) .put( - RemoteClusterStateService.REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_MODE_SETTING.getKey(), - RemoteClusterStateService.RemoteClusterStateValidationMode.FAILURE + REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_MODE_SETTING.getKey(), + RemoteClusterStateSettings.RemoteClusterStateValidationMode.FAILURE ) .put(REMOTE_PUBLICATION_SETTING_KEY, isRemotePublicationEnabled) - .put( - RemoteClusterStateService.CLUSTER_REMOTE_STORE_STATE_PATH_PREFIX.getKey(), - hasRemoteStateCharPrefix ? REMOTE_STATE_PREFIX : "" - ) + .put(CLUSTER_REMOTE_STORE_STATE_PATH_PREFIX.getKey(), hasRemoteStateCharPrefix ? REMOTE_STATE_PREFIX : "") .put( RemoteRoutingTableBlobStore.CLUSTER_REMOTE_STORE_ROUTING_TABLE_PATH_PREFIX.getKey(), hasRemoteRoutingCharPrefix ? REMOTE_ROUTING_PREFIX : "" diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java index 17a9c3ddbe317..95cf0291b844b 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java @@ -48,7 +48,7 @@ import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; import static org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING; -import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING; +import static org.opensearch.gateway.remote.RemoteClusterStateSettings.REMOTE_CLUSTER_STATE_ENABLED_SETTING; import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING; import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING; import static org.opensearch.repositories.fs.ReloadableFsRepository.REPOSITORIES_FAILRATE_SETTING; diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreClusterStateRestoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreClusterStateRestoreIT.java index d078ba05faa12..3e17778b14a71 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreClusterStateRestoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreClusterStateRestoreIT.java @@ -58,7 +58,7 @@ import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_READ_ONLY_SETTING; import static org.opensearch.cluster.metadata.Metadata.CLUSTER_READ_ONLY_BLOCK; import static org.opensearch.cluster.metadata.Metadata.SETTING_READ_ONLY_SETTING; -import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING; +import static org.opensearch.gateway.remote.RemoteClusterStateSettings.REMOTE_CLUSTER_STATE_ENABLED_SETTING; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.encodeString; import static org.opensearch.indices.ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE; import static org.opensearch.repositories.blobstore.BlobStoreRepository.SYSTEM_REPOSITORY_SETTING; diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreUploadIndexPathIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreUploadIndexPathIT.java index 44c02dbb6d611..4b6a22edd6607 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreUploadIndexPathIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreUploadIndexPathIT.java @@ -22,7 +22,7 @@ import java.util.Arrays; import java.util.concurrent.ExecutionException; -import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING; +import static org.opensearch.gateway.remote.RemoteClusterStateSettings.REMOTE_CLUSTER_STATE_ENABLED_SETTING; import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; diff --git a/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java b/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java index 9cffc7051d756..69d1996a61510 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java @@ -53,7 +53,7 @@ import java.util.Set; import static org.opensearch.cluster.coordination.Coordinator.ZEN1_BWC_TERM; -import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_PUBLICATION_SETTING; +import static org.opensearch.gateway.remote.RemoteClusterStateSettings.REMOTE_PUBLICATION_SETTING; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled; /** diff --git a/server/src/main/java/org/opensearch/cluster/coordination/ElectionSchedulerFactory.java b/server/src/main/java/org/opensearch/cluster/coordination/ElectionSchedulerFactory.java index 828db5864d28b..4db1656c1d799 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/ElectionSchedulerFactory.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/ElectionSchedulerFactory.java @@ -156,7 +156,7 @@ public Releasable startElectionScheduler(TimeValue gracePeriod, Runnable schedul } @SuppressForbidden(reason = "Argument to Math.abs() is definitely not Long.MIN_VALUE") - private static long nonNegative(long n) { + public static long nonNegative(long n) { return n == Long.MIN_VALUE ? 0 : Math.abs(n); } diff --git a/server/src/main/java/org/opensearch/cluster/coordination/UnsafeBootstrapClusterManagerCommand.java b/server/src/main/java/org/opensearch/cluster/coordination/UnsafeBootstrapClusterManagerCommand.java index 168ae5212888f..c65184faa0be5 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/UnsafeBootstrapClusterManagerCommand.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/UnsafeBootstrapClusterManagerCommand.java @@ -53,7 +53,7 @@ import java.util.Locale; import java.util.Objects; -import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING; +import static org.opensearch.gateway.remote.RemoteClusterStateSettings.REMOTE_CLUSTER_STATE_ENABLED_SETTING; /** * Tool to run an unsafe bootstrap diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java index 220093b428989..27e294c88cc2c 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java @@ -27,6 +27,7 @@ import org.opensearch.core.action.ActionListener; import org.opensearch.core.compress.Compressor; import org.opensearch.gateway.remote.ClusterMetadataManifest; +import org.opensearch.gateway.remote.RemoteClusterStateSettings; import org.opensearch.gateway.remote.RemoteClusterStateUtils; import org.opensearch.gateway.remote.RemoteStateTransferException; import org.opensearch.gateway.remote.model.RemoteRoutingTableBlobStore; @@ -49,6 +50,7 @@ import java.util.function.Supplier; import java.util.stream.Collectors; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.getRandomDownloadJitterDelay; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled; /** @@ -68,11 +70,13 @@ public class InternalRemoteRoutingTableService extends AbstractLifecycleComponen private BlobStoreRepository blobStoreRepository; private final ThreadPool threadPool; private final String clusterName; + private RemoteClusterStateSettings remoteClusterStateSettings; public InternalRemoteRoutingTableService( Supplier repositoriesService, Settings settings, ClusterSettings clusterSettings, + RemoteClusterStateSettings remoteClusterStateSettings, ThreadPool threadpool, String clusterName ) { @@ -82,6 +86,7 @@ public InternalRemoteRoutingTableService( this.threadPool = threadpool; this.clusterName = clusterName; this.clusterSettings = clusterSettings; + this.remoteClusterStateSettings = remoteClusterStateSettings; } public List getIndicesRouting(RoutingTable routingTable) { @@ -193,7 +198,11 @@ public void getAsyncIndexRoutingReadAction( RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable(uploadedFilename, clusterUUID, compressor); - remoteIndexRoutingTableStore.readAsync(remoteIndexRoutingTable, actionListener); + remoteIndexRoutingTableStore.readAsyncWithDelay( + getRandomDownloadJitterDelay(remoteClusterStateSettings), + remoteIndexRoutingTable, + actionListener + ); } @Override @@ -208,7 +217,12 @@ public void getAsyncIndexRoutingTableDiffReadAction( ); RemoteRoutingTableDiff remoteRoutingTableDiff = new RemoteRoutingTableDiff(uploadedFilename, clusterUUID, compressor); - remoteRoutingTableDiffStore.readAsync(remoteRoutingTableDiff, actionListener); + + remoteRoutingTableDiffStore.readAsyncWithDelay( + getRandomDownloadJitterDelay(remoteClusterStateSettings), + remoteRoutingTableDiff, + actionListener + ); } @Override diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactory.java b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactory.java index 56dfa03215a64..8c6f69ccbcb04 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactory.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactory.java @@ -10,6 +10,7 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.gateway.remote.RemoteClusterStateSettings; import org.opensearch.repositories.RepositoriesService; import org.opensearch.threadpool.ThreadPool; @@ -34,11 +35,19 @@ public static RemoteRoutingTableService getService( Supplier repositoriesService, Settings settings, ClusterSettings clusterSettings, + RemoteClusterStateSettings remoteClusterStateSettings, ThreadPool threadPool, String clusterName ) { if (isRemoteRoutingTableEnabled(settings)) { - return new InternalRemoteRoutingTableService(repositoriesService, settings, clusterSettings, threadPool, clusterName); + return new InternalRemoteRoutingTableService( + repositoriesService, + settings, + clusterSettings, + remoteClusterStateSettings, + threadPool, + clusterName + ); } return new NoopRemoteRoutingTableService(); } diff --git a/server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableEntityManager.java b/server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableEntityManager.java index 8e2de1580a49f..e9c66b50b1283 100644 --- a/server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableEntityManager.java +++ b/server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableEntityManager.java @@ -10,10 +10,15 @@ import org.opensearch.core.action.ActionListener; import org.opensearch.gateway.remote.ClusterMetadataManifest; +import org.opensearch.gateway.remote.DefaultRandomObject; +import org.opensearch.gateway.remote.RemoteClusterStateSettings; import org.opensearch.gateway.remote.model.RemoteReadResult; import java.util.HashMap; import java.util.Map; +import java.util.Random; + +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.getRandomDownloadJitterDelay; /** * An abstract class that provides a base implementation for managing remote entities in the remote store. @@ -24,6 +29,14 @@ public abstract class AbstractRemoteWritableEntityManager implements RemoteWrita */ protected final Map remoteWritableEntityStores = new HashMap<>(); + /** + * RemoteClusterStateSettings to add jitter in the download calls. + * Intentionally set to null to make sure the inheriting class configure it correctly. + */ + protected RemoteClusterStateSettings remoteClusterStateSettings = null; + + protected final Random random = DefaultRandomObject.INSTANCE; + /** * Retrieves the remote writable entity store for the given entity. * @@ -68,6 +81,10 @@ protected abstract ActionListener getWrappedReadListener( ActionListener listener ); + public void addRemoteWritableEntityStore(String EntityName, RemoteWritableEntityStore remoteWritableEntityStore) { + remoteWritableEntityStores.put(EntityName, remoteWritableEntityStore); + } + @Override public void writeAsync( String component, @@ -79,6 +96,11 @@ public void writeAsync( @Override public void readAsync(String component, AbstractClusterMetadataWriteableBlobEntity entity, ActionListener listener) { - getStore(entity).readAsync(entity, getWrappedReadListener(component, entity, listener)); + getStore(entity).readAsyncWithDelay( + getRandomDownloadJitterDelay(remoteClusterStateSettings), + entity, + getWrappedReadListener(component, entity, listener) + ); } + } diff --git a/server/src/main/java/org/opensearch/common/remote/RemoteWritableEntityStore.java b/server/src/main/java/org/opensearch/common/remote/RemoteWritableEntityStore.java index 385c6f20ba58d..2b1edfbf4b987 100644 --- a/server/src/main/java/org/opensearch/common/remote/RemoteWritableEntityStore.java +++ b/server/src/main/java/org/opensearch/common/remote/RemoteWritableEntityStore.java @@ -27,4 +27,6 @@ public interface RemoteWritableEntityStore public T read(U entity) throws IOException; public void readAsync(U entity, ActionListener listener); + + public void readAsyncWithDelay(long delayInMillis, U entity, ActionListener listener); } diff --git a/server/src/main/java/org/opensearch/common/remote/RemoteWriteableEntityBlobStore.java b/server/src/main/java/org/opensearch/common/remote/RemoteWriteableEntityBlobStore.java index b5e074874dd38..3da718dbf958c 100644 --- a/server/src/main/java/org/opensearch/common/remote/RemoteWriteableEntityBlobStore.java +++ b/server/src/main/java/org/opensearch/common/remote/RemoteWriteableEntityBlobStore.java @@ -10,6 +10,7 @@ import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.stream.write.WritePriority; +import org.opensearch.common.unit.TimeValue; import org.opensearch.core.action.ActionListener; import org.opensearch.index.translog.transfer.BlobStoreTransferService; import org.opensearch.repositories.blobstore.BlobStoreRepository; @@ -32,8 +33,10 @@ public class RemoteWriteableEntityBlobStore listener) { }); } + public void readAsyncWithDelay(final long delayMillis, final U entity, final ActionListener listener) { + Runnable runnable = () -> { + try { + listener.onResponse(read(entity)); + } catch (Exception e) { + listener.onFailure(e); + } + }; + threadPool.scheduleUnlessShuttingDown(TimeValue.timeValueMillis(delayMillis), executor, runnable); + } + public String getClusterName() { return clusterName; } @@ -121,5 +137,4 @@ public BlobPath getBlobPathForDownload(final RemoteWriteableBlobEntity obj) { private static String encodeString(String content) { return Base64.getUrlEncoder().withoutPadding().encodeToString(content.getBytes(StandardCharsets.UTF_8)); } - } diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index ecdd23530c648..064c277f2fd59 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -107,7 +107,6 @@ import org.opensearch.gateway.PersistedClusterStateService; import org.opensearch.gateway.ShardsBatchGatewayAllocator; import org.opensearch.gateway.remote.RemoteClusterStateCleanupManager; -import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.gateway.remote.RemoteIndexMetadataManager; import org.opensearch.gateway.remote.model.RemoteRoutingTableBlobStore; import org.opensearch.http.HttpTransportSettings; @@ -187,6 +186,12 @@ import java.util.Set; import java.util.function.Predicate; +import static org.opensearch.gateway.remote.RemoteClusterStateSettings.CLUSTER_REMOTE_STORE_STATE_PATH_PREFIX; +import static org.opensearch.gateway.remote.RemoteClusterStateSettings.REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_MODE_SETTING; +import static org.opensearch.gateway.remote.RemoteClusterStateSettings.REMOTE_CLUSTER_STATE_ENABLED_SETTING; +import static org.opensearch.gateway.remote.RemoteClusterStateSettings.REMOTE_PUBLICATION_SETTING; +import static org.opensearch.gateway.remote.RemoteClusterStateSettings.REMOTE_STATE_READ_MAX_JITTER; +import static org.opensearch.gateway.remote.RemoteClusterStateSettings.REMOTE_STATE_READ_TIMEOUT_SETTING; import static org.opensearch.gateway.remote.RemoteGlobalMetadataManager.GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING; import static org.opensearch.gateway.remote.RemoteIndexMetadataManager.INDEX_METADATA_UPLOAD_TIMEOUT_SETTING; import static org.opensearch.gateway.remote.RemoteManifestManager.METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING; @@ -733,13 +738,13 @@ public void apply(Settings value, Settings current, Settings previous) { // Remote cluster state settings RemoteClusterStateCleanupManager.REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING, - RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING, - RemoteClusterStateService.REMOTE_PUBLICATION_SETTING, + REMOTE_CLUSTER_STATE_ENABLED_SETTING, + REMOTE_PUBLICATION_SETTING, INDEX_METADATA_UPLOAD_TIMEOUT_SETTING, GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING, METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING, - RemoteClusterStateService.REMOTE_STATE_READ_TIMEOUT_SETTING, - RemoteClusterStateService.CLUSTER_REMOTE_STORE_STATE_PATH_PREFIX, + REMOTE_STATE_READ_TIMEOUT_SETTING, + CLUSTER_REMOTE_STORE_STATE_PATH_PREFIX, RemoteIndexMetadataManager.REMOTE_INDEX_METADATA_PATH_TYPE_SETTING, RemoteIndexMetadataManager.REMOTE_INDEX_METADATA_PATH_HASH_ALGO_SETTING, RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING, @@ -748,7 +753,8 @@ public void apply(Settings value, Settings current, Settings previous) { IndicesService.CLUSTER_INDEX_RESTRICT_REPLICATION_TYPE_SETTING, RemoteRoutingTableBlobStore.REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING, RemoteRoutingTableBlobStore.REMOTE_ROUTING_TABLE_PATH_HASH_ALGO_SETTING, - RemoteClusterStateService.REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_MODE_SETTING, + REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_MODE_SETTING, + REMOTE_STATE_READ_MAX_JITTER, RemoteRoutingTableBlobStore.CLUSTER_REMOTE_STORE_ROUTING_TABLE_PATH_PREFIX, // Admission Control Settings diff --git a/server/src/main/java/org/opensearch/gateway/remote/DefaultRandomObject.java b/server/src/main/java/org/opensearch/gateway/remote/DefaultRandomObject.java new file mode 100644 index 0000000000000..ead947f6826ff --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/DefaultRandomObject.java @@ -0,0 +1,22 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote; + +import org.opensearch.common.Randomness; + +import java.util.Random; + +/** + * Utility to provide a {@link Random} static instance + * + * @opensearch.internal + */ +public class DefaultRandomObject { + public static final Random INSTANCE = new Random(Randomness.get().nextLong()); +} diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java index 877e2585cb1eb..7f49f5e1006d0 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java @@ -39,12 +39,14 @@ public class RemoteClusterStateAttributesManager extends AbstractRemoteWritableE public static final int CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION = 1; RemoteClusterStateAttributesManager( + RemoteClusterStateSettings remoteClusterStateSettings, String clusterName, BlobStoreRepository blobStoreRepository, BlobStoreTransferService blobStoreTransferService, NamedWriteableRegistry namedWriteableRegistry, ThreadPool threadpool ) { + this.remoteClusterStateSettings = remoteClusterStateSettings; this.remoteWritableEntityStores.put( RemoteDiscoveryNodes.DISCOVERY_NODES, new RemoteWriteableEntityBlobStore<>( diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index ece29180f9cf5..b1584b2670e49 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -37,8 +37,6 @@ import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobStore; import org.opensearch.common.settings.ClusterSettings; -import org.opensearch.common.settings.Setting; -import org.opensearch.common.settings.Setting.Property; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.io.IOUtils; @@ -47,6 +45,7 @@ import org.opensearch.core.xcontent.ToXContent; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute; +import org.opensearch.gateway.remote.RemoteClusterStateSettings.RemoteClusterStateValidationMode; import org.opensearch.gateway.remote.model.RemoteClusterBlocks; import org.opensearch.gateway.remote.model.RemoteClusterStateCustoms; import org.opensearch.gateway.remote.model.RemoteClusterStateManifestInfo; @@ -71,7 +70,6 @@ import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -123,91 +121,6 @@ public class RemoteClusterStateService implements Closeable { private static final Logger logger = LogManager.getLogger(RemoteClusterStateService.class); - /** - * Gates the functionality of remote publication. - */ - public static final String REMOTE_PUBLICATION_SETTING_KEY = "cluster.remote_store.publication.enabled"; - - public static final Setting REMOTE_PUBLICATION_SETTING = Setting.boolSetting( - REMOTE_PUBLICATION_SETTING_KEY, - false, - Property.NodeScope, - Property.Final - ); - - /** - * Used to specify if cluster state metadata should be published to remote store - */ - public static final Setting REMOTE_CLUSTER_STATE_ENABLED_SETTING = Setting.boolSetting( - "cluster.remote_store.state.enabled", - false, - Property.NodeScope, - Property.Final - ); - - public static final TimeValue REMOTE_STATE_READ_TIMEOUT_DEFAULT = TimeValue.timeValueMillis(20000); - - public static final Setting REMOTE_STATE_READ_TIMEOUT_SETTING = Setting.timeSetting( - "cluster.remote_store.state.read_timeout", - REMOTE_STATE_READ_TIMEOUT_DEFAULT, - Setting.Property.Dynamic, - Setting.Property.NodeScope - ); - - public static final Setting REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_MODE_SETTING = new Setting<>( - "cluster.remote_store.state.checksum_validation.mode", - RemoteClusterStateValidationMode.NONE.name(), - RemoteClusterStateValidationMode::parseString, - Setting.Property.Dynamic, - Setting.Property.NodeScope - ); - - /** - * Controls the fixed prefix for the cluster state path on remote store. - */ - public static final Setting CLUSTER_REMOTE_STORE_STATE_PATH_PREFIX = Setting.simpleString( - "cluster.remote_store.state.path.prefix", - "", - Property.NodeScope, - Property.Final - ); - - /** - * Validation mode for cluster state checksum. - * None: Validation will be disabled. - * Debug: Validation enabled but only matches checksum and logs failing entities. - * Trace: Matches checksum and downloads full cluster state to find diff in failing entities. Only logs failures. - * Failure: Throws exception on failing validation. - */ - public enum RemoteClusterStateValidationMode { - DEBUG("debug"), - TRACE("trace"), - FAILURE("failure"), - NONE("none"); - - public final String mode; - - RemoteClusterStateValidationMode(String mode) { - this.mode = mode; - } - - public static RemoteClusterStateValidationMode parseString(String mode) { - try { - return RemoteClusterStateValidationMode.valueOf(mode.toUpperCase(Locale.ROOT)); - } catch (IllegalArgumentException e) { - throw new IllegalArgumentException( - "[" - + mode - + "] mode is not supported. " - + "supported modes are [" - + Arrays.toString(RemoteClusterStateValidationMode.values()) - + "]" - ); - } - } - } - - private TimeValue remoteStateReadTimeout; private final String nodeId; private final Supplier repositoriesService; private final Settings settings; @@ -218,7 +131,6 @@ public static RemoteClusterStateValidationMode parseString(String mode) { private BlobStoreTransferService blobStoreTransferService; private RemoteRoutingTableService remoteRoutingTableService; private volatile TimeValue slowWriteLoggingThreshold; - private RemoteClusterStateValidationMode remoteClusterStateValidationMode; private final RemotePersistenceStats remoteStateStats; private RemoteClusterStateCleanupManager remoteClusterStateCleanupManager; @@ -233,7 +145,7 @@ public static RemoteClusterStateValidationMode parseString(String mode) { + "indices, coordination metadata updated : [{}], settings metadata updated : [{}], templates metadata " + "updated : [{}], custom metadata updated : [{}], indices routing updated : [{}]"; private final boolean isPublicationEnabled; - private final String remotePathPrefix; + private RemoteClusterStateSettings remoteClusterStateSettings; private final RemoteClusterStateCache remoteClusterStateCache; // ToXContent Params with gateway mode. @@ -265,22 +177,18 @@ public RemoteClusterStateService( clusterSettings = clusterService.getClusterSettings(); this.slowWriteLoggingThreshold = clusterSettings.get(SLOW_WRITE_LOGGING_THRESHOLD); clusterSettings.addSettingsUpdateConsumer(SLOW_WRITE_LOGGING_THRESHOLD, this::setSlowWriteLoggingThreshold); - this.remoteStateReadTimeout = clusterSettings.get(REMOTE_STATE_READ_TIMEOUT_SETTING); - clusterSettings.addSettingsUpdateConsumer(REMOTE_STATE_READ_TIMEOUT_SETTING, this::setRemoteStateReadTimeout); - this.remoteClusterStateValidationMode = REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_MODE_SETTING.get(settings); - clusterSettings.addSettingsUpdateConsumer(REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_MODE_SETTING, this::setChecksumValidationMode); - this.remoteStateStats = new RemotePersistenceStats(); this.namedWriteableRegistry = namedWriteableRegistry; this.indexMetadataUploadListeners = indexMetadataUploadListeners; - this.isPublicationEnabled = REMOTE_PUBLICATION_SETTING.get(settings) + this.remoteClusterStateSettings = new RemoteClusterStateSettings(settings, clusterSettings); + this.isPublicationEnabled = remoteClusterStateSettings.getRemotePublicationSetting() && RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled(settings) && RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled(settings); - this.remotePathPrefix = CLUSTER_REMOTE_STORE_STATE_PATH_PREFIX.get(settings); this.remoteRoutingTableService = RemoteRoutingTableServiceFactory.getService( repositoriesService, settings, clusterSettings, + remoteClusterStateSettings, threadpool, ClusterName.CLUSTER_NAME_SETTING.get(settings).value() ); @@ -332,7 +240,8 @@ public RemoteClusterStateManifestInfo writeFullMetadata(ClusterState clusterStat uploadedMetadataResults, previousClusterUUID, clusterStateDiffManifest, - !remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.NONE) ? new ClusterStateChecksum(clusterState) : null, + !remoteClusterStateSettings.getRemoteClusterStateValidationMode() + .equals(RemoteClusterStateSettings.RemoteClusterStateValidationMode.NONE) ? new ClusterStateChecksum(clusterState) : null, false, codecVersion ); @@ -539,7 +448,9 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata( uploadedMetadataResults, previousManifest.getPreviousClusterUUID(), clusterStateDiffManifest, - !remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.NONE) ? new ClusterStateChecksum(clusterState) : null, + !remoteClusterStateSettings.getRemoteClusterStateValidationMode().equals(RemoteClusterStateValidationMode.NONE) + ? new ClusterStateChecksum(clusterState) + : null, false, previousManifest.getCodecVersion() ); @@ -764,7 +675,7 @@ UploadedMetadataResults writeMetadataInParallel( blobStoreRepository.getNamedXContentRegistry(), remoteIndexMetadataManager.getPathTypeSetting(), remoteIndexMetadataManager.getPathHashAlgoSetting(), - remotePathPrefix + remoteClusterStateSettings.getRemotePathPrefix() ), listener ); @@ -1010,7 +921,9 @@ public RemoteClusterStateManifestInfo markLastStateAsCommitted( uploadedMetadataResults, previousManifest.getPreviousClusterUUID(), previousManifest.getDiffManifest(), - !remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.NONE) ? new ClusterStateChecksum(clusterState) : null, + !remoteClusterStateSettings.getRemoteClusterStateValidationMode().equals(RemoteClusterStateValidationMode.NONE) + ? new ClusterStateChecksum(clusterState) + : null, true, previousManifest.getCodecVersion() ); @@ -1065,9 +978,9 @@ public void start() { blobStoreRepository = (BlobStoreRepository) repository; String clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings).value(); blobStoreTransferService = new BlobStoreTransferService(getBlobStore(), threadpool); - remoteGlobalMetadataManager = new RemoteGlobalMetadataManager( clusterSettings, + remoteClusterStateSettings, clusterName, blobStoreRepository, blobStoreTransferService, @@ -1076,6 +989,7 @@ public void start() { ); remoteIndexMetadataManager = new RemoteIndexMetadataManager( clusterSettings, + remoteClusterStateSettings, clusterName, blobStoreRepository, blobStoreTransferService, @@ -1090,6 +1004,7 @@ public void start() { threadpool ); remoteClusterStateAttributesManager = new RemoteClusterStateAttributesManager( + remoteClusterStateSettings, clusterName, blobStoreRepository, blobStoreTransferService, @@ -1105,10 +1020,6 @@ private void setSlowWriteLoggingThreshold(TimeValue slowWriteLoggingThreshold) { this.slowWriteLoggingThreshold = slowWriteLoggingThreshold; } - private void setChecksumValidationMode(RemoteClusterStateValidationMode remoteClusterStateValidationMode) { - this.remoteClusterStateValidationMode = remoteClusterStateValidationMode; - } - // Package private for unit test RemoteRoutingTableService getRemoteRoutingTableService() { return this.remoteRoutingTableService; @@ -1355,9 +1266,10 @@ ClusterState readClusterStateInParallel( } try { - if (latch.await(this.remoteStateReadTimeout.getMillis(), TimeUnit.MILLISECONDS) == false) { + if (latch.await(remoteClusterStateSettings.getRemoteStateReadTimeout().getMillis(), TimeUnit.MILLISECONDS) == false) { RemoteStateTransferException exception = new RemoteStateTransferException( - "Timed out waiting to read cluster state from remote within timeout " + this.remoteStateReadTimeout + "Timed out waiting to read cluster state from remote within timeout " + + remoteClusterStateSettings.getRemoteStateReadTimeout() ); exceptionList.forEach(exception::addSuppressed); throw exception; @@ -1484,7 +1396,7 @@ public ClusterState getClusterStateForManifest( ); if (includeEphemeral - && !remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.NONE) + && !remoteClusterStateSettings.getRemoteClusterStateValidationMode().equals(RemoteClusterStateValidationMode.NONE) && manifest.getClusterStateChecksum() != null) { validateClusterStateFromChecksum(manifest, clusterState, clusterName, localNodeId, true); } @@ -1516,10 +1428,12 @@ public ClusterState getClusterStateForManifest( final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos); remoteStateStats.stateFullDownloadSucceeded(); remoteStateStats.stateFullDownloadTook(durationMillis); + if (includeEphemeral) { // cache only if the entire cluster-state is present remoteClusterStateCache.putState(clusterState); } + return clusterState; } @@ -1611,7 +1525,8 @@ public ClusterState getClusterStateUsingDiff(ClusterMetadataManifest manifest, C .metadata(metadataBuilder) .routingTable(new RoutingTable(manifest.getRoutingTableVersion(), indexRoutingTables)) .build(); - if (!remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.NONE) && manifest.getClusterStateChecksum() != null) { + if (!remoteClusterStateSettings.getRemoteClusterStateValidationMode().equals(RemoteClusterStateValidationMode.NONE) + && manifest.getClusterStateChecksum() != null) { validateClusterStateFromChecksum(manifest, clusterState, previousState.getClusterName().value(), localNodeId, false); } final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos); @@ -1650,13 +1565,14 @@ void validateClusterStateFromChecksum( remoteStateStats.stateDiffDownloadValidationFailed(); } - if (isFullStateDownload && remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.FAILURE)) { + if (isFullStateDownload + && remoteClusterStateSettings.getRemoteClusterStateValidationMode().equals(RemoteClusterStateValidationMode.FAILURE)) { throw new IllegalStateException( "Cluster state checksums do not match during full state read. Validation failed for " + failedValidation ); } - if (remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.FAILURE) - || remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.TRACE)) { + if (remoteClusterStateSettings.getRemoteClusterStateValidationMode().equals(RemoteClusterStateValidationMode.FAILURE) + || remoteClusterStateSettings.getRemoteClusterStateValidationMode().equals(RemoteClusterStateValidationMode.TRACE)) { // download full cluster state and match against state created for the failing entities ClusterState fullClusterState = readClusterStateInParallel( ClusterState.builder(new ClusterName(clusterName)).build(), @@ -1790,7 +1706,7 @@ void validateClusterStateFromChecksum( } } } - if (remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.FAILURE)) { + if (remoteClusterStateSettings.getRemoteClusterStateValidationMode().equals(RemoteClusterStateValidationMode.FAILURE)) { throw new IllegalStateException( "Cluster state checksums do not match during diff read. Validation failed for " + failedValidation ); @@ -1827,10 +1743,6 @@ public boolean isRemotePublicationEnabled() { return this.isPublicationEnabled; } - public void setRemoteStateReadTimeout(TimeValue remoteStateReadTimeout) { - this.remoteStateReadTimeout = remoteStateReadTimeout; - } - private BlobStoreTransferService getBlobStoreTransferService() { if (blobStoreTransferService == null) { blobStoreTransferService = new BlobStoreTransferService(getBlobStore(), threadpool); diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateSettings.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateSettings.java new file mode 100644 index 0000000000000..a883741a3051c --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateSettings.java @@ -0,0 +1,167 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote; + +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; + +import java.util.Arrays; +import java.util.Locale; + +/** + * Settings for remote cluster state + * + * @opensearch.api + */ +public class RemoteClusterStateSettings { + + /** + * Gates the functionality of remote publication. + */ + public static final String REMOTE_PUBLICATION_SETTING_KEY = "cluster.remote_store.publication.enabled"; + + public static final Setting REMOTE_PUBLICATION_SETTING = Setting.boolSetting( + REMOTE_PUBLICATION_SETTING_KEY, + false, + Setting.Property.NodeScope, + Setting.Property.Final + ); + + /** + * Used to specify if cluster state metadata should be published to remote store + */ + public static final Setting REMOTE_CLUSTER_STATE_ENABLED_SETTING = Setting.boolSetting( + "cluster.remote_store.state.enabled", + false, + Setting.Property.NodeScope, + Setting.Property.Final + ); + + public static final TimeValue REMOTE_STATE_READ_TIMEOUT_DEFAULT = TimeValue.timeValueMillis(20000); + + public static final Setting REMOTE_STATE_READ_TIMEOUT_SETTING = Setting.timeSetting( + "cluster.remote_store.state.read_timeout", + REMOTE_STATE_READ_TIMEOUT_DEFAULT, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + public static final Setting REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_MODE_SETTING = new Setting<>( + "cluster.remote_store.state.checksum_validation.mode", + RemoteClusterStateValidationMode.NONE.name(), + RemoteClusterStateValidationMode::parseString, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * Controls the fixed prefix for the cluster state path on remote store. + */ + public static final Setting CLUSTER_REMOTE_STORE_STATE_PATH_PREFIX = Setting.simpleString( + "cluster.remote_store.state.path.prefix", + "", + Setting.Property.NodeScope, + Setting.Property.Final + ); + + public static final TimeValue REMOTE_STATE_READ_MAX_JITTER_DEFAULT = TimeValue.timeValueMillis(500); + + public static final Setting REMOTE_STATE_READ_MAX_JITTER = Setting.timeSetting( + "cluster.remote_store.state.read.max_jitter", + REMOTE_STATE_READ_MAX_JITTER_DEFAULT, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + private TimeValue remoteStateReadTimeout; + private RemoteClusterStateValidationMode remoteClusterStateValidationMode; + private final String remotePathPrefix; + private TimeValue remoteStateReadMaxJitter; + private boolean remotePublicationSetting; + + public RemoteClusterStateSettings(Settings settings, ClusterSettings clusterSettings) { + this.remoteStateReadTimeout = clusterSettings.get(REMOTE_STATE_READ_TIMEOUT_SETTING); + clusterSettings.addSettingsUpdateConsumer(REMOTE_STATE_READ_TIMEOUT_SETTING, this::setRemoteStateReadTimeout); + this.remoteClusterStateValidationMode = REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_MODE_SETTING.get(settings); + clusterSettings.addSettingsUpdateConsumer(REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_MODE_SETTING, this::setChecksumValidationMode); + this.remotePathPrefix = CLUSTER_REMOTE_STORE_STATE_PATH_PREFIX.get(settings); + this.remoteStateReadMaxJitter = REMOTE_STATE_READ_MAX_JITTER.get(settings); + clusterSettings.addSettingsUpdateConsumer(REMOTE_STATE_READ_MAX_JITTER, this::setRemoteStateReadMaxJitter); + this.remotePublicationSetting = REMOTE_PUBLICATION_SETTING.get(settings); + } + + public boolean getRemotePublicationSetting() { + return remotePublicationSetting; + } + + private void setRemoteStateReadTimeout(TimeValue remoteStateReadTimeout) { + this.remoteStateReadTimeout = remoteStateReadTimeout; + } + + private void setChecksumValidationMode(RemoteClusterStateValidationMode remoteClusterStateValidationMode) { + this.remoteClusterStateValidationMode = remoteClusterStateValidationMode; + } + + private void setRemoteStateReadMaxJitter(TimeValue remoteStateReadMaxJitter) { + this.remoteStateReadMaxJitter = remoteStateReadMaxJitter; + } + + public TimeValue getRemoteStateReadTimeout() { + return remoteStateReadTimeout; + } + + public RemoteClusterStateValidationMode getRemoteClusterStateValidationMode() { + return remoteClusterStateValidationMode; + } + + public String getRemotePathPrefix() { + return remotePathPrefix; + } + + public TimeValue getRemoteStateReadMaxJitter() { + return remoteStateReadMaxJitter; + } + + /** + * Validation mode for cluster state checksum. + * None: Validation will be disabled. + * Debug: Validation enabled but only matches checksum and logs failing entities. + * Trace: Matches checksum and downloads full cluster state to find diff in failing entities. Only logs failures. + * Failure: Throws exception on failing validation. + */ + public enum RemoteClusterStateValidationMode { + DEBUG("debug"), + TRACE("trace"), + FAILURE("failure"), + NONE("none"); + + public final String mode; + + RemoteClusterStateValidationMode(String mode) { + this.mode = mode; + } + + public static RemoteClusterStateValidationMode parseString(String mode) { + try { + return RemoteClusterStateValidationMode.valueOf(mode.toUpperCase(Locale.ROOT)); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException( + "[" + + mode + + "] mode is not supported. " + + "supported modes are [" + + Arrays.toString(RemoteClusterStateValidationMode.values()) + + "]" + ); + } + } + } +} diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateUtils.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateUtils.java index 74cb838286961..f7101a50878b6 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateUtils.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateUtils.java @@ -22,6 +22,7 @@ import java.util.Locale; import java.util.Map; +import static org.opensearch.cluster.coordination.ElectionSchedulerFactory.nonNegative; import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V1; /** @@ -73,6 +74,21 @@ static BlobContainer clusterUUIDContainer(BlobStoreRepository blobStoreRepositor ); } + public static long getRandomDownloadJitterDelay(RemoteClusterStateSettings remoteClusterStateSettings) { + long maxDelayInMillis = remoteClusterStateSettings.getRemoteStateReadMaxJitter().getMillis(); + return toPositiveLongAtMost(DefaultRandomObject.INSTANCE.nextLong(), maxDelayInMillis); + } + + /** + * @param randomNumber a randomly-chosen long + * @param upperBound inclusive upper bound + * @return a number in the range (0, upperBound] + */ + static long toPositiveLongAtMost(long randomNumber, long upperBound) { + assert 0 < upperBound : upperBound; + return nonNegative(randomNumber) % upperBound + 1; + } + /** * Container class to keep metadata of all uploaded attributes */ diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManager.java index 763a8e3ff4951..7b8ced7852519 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManager.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManager.java @@ -73,6 +73,7 @@ public class RemoteGlobalMetadataManager extends AbstractRemoteWritableEntityMan RemoteGlobalMetadataManager( ClusterSettings clusterSettings, + RemoteClusterStateSettings remoteClusterStateSettings, String clusterName, BlobStoreRepository blobStoreRepository, BlobStoreTransferService blobStoreTransferService, @@ -83,6 +84,7 @@ public class RemoteGlobalMetadataManager extends AbstractRemoteWritableEntityMan this.compressor = blobStoreRepository.getCompressor(); this.namedXContentRegistry = blobStoreRepository.getNamedXContentRegistry(); this.namedWriteableRegistry = namedWriteableRegistry; + this.remoteClusterStateSettings = remoteClusterStateSettings; this.remoteWritableEntityStores.put( RemoteGlobalMetadata.GLOBAL_METADATA, new RemoteWriteableEntityBlobStore<>( diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteIndexMetadataManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteIndexMetadataManager.java index d8e8ffc68834d..570bb31cd7de9 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteIndexMetadataManager.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteIndexMetadataManager.java @@ -79,6 +79,7 @@ public class RemoteIndexMetadataManager extends AbstractRemoteWritableEntityMana public RemoteIndexMetadataManager( ClusterSettings clusterSettings, + RemoteClusterStateSettings remoteClusterStateSettings, String clusterName, BlobStoreRepository blobStoreRepository, BlobStoreTransferService blobStoreTransferService, @@ -100,6 +101,7 @@ public RemoteIndexMetadataManager( this.indexMetadataUploadTimeout = clusterSettings.get(INDEX_METADATA_UPLOAD_TIMEOUT_SETTING); this.pathType = clusterSettings.get(REMOTE_INDEX_METADATA_PATH_TYPE_SETTING); this.pathHashAlgo = clusterSettings.get(REMOTE_INDEX_METADATA_PATH_HASH_ALGO_SETTING); + this.remoteClusterStateSettings = remoteClusterStateSettings; clusterSettings.addSettingsUpdateConsumer(INDEX_METADATA_UPLOAD_TIMEOUT_SETTING, this::setIndexMetadataUploadTimeout); clusterSettings.addSettingsUpdateConsumer(REMOTE_INDEX_METADATA_PATH_TYPE_SETTING, this::setPathTypeSetting); clusterSettings.addSettingsUpdateConsumer(REMOTE_INDEX_METADATA_PATH_HASH_ALGO_SETTING, this::setPathHashAlgoSetting); diff --git a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java index 55971398634c5..3e60de66cec55 100644 --- a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java +++ b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java @@ -13,7 +13,6 @@ import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.settings.Settings; -import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.node.Node; import org.opensearch.repositories.blobstore.BlobStoreRepository; @@ -28,6 +27,8 @@ import java.util.Set; import java.util.stream.Collectors; +import static org.opensearch.gateway.remote.RemoteClusterStateSettings.REMOTE_CLUSTER_STATE_ENABLED_SETTING; + /** * This is an abstraction for validating and storing information specific to remote backed storage nodes. * @@ -194,8 +195,7 @@ public static String getRemoteStoreTranslogRepo(Settings settings) { } public static boolean isRemoteStoreClusterStateEnabled(Settings settings) { - return RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings) - && isRemoteClusterStateAttributePresent(settings); + return REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings) && isRemoteClusterStateAttributePresent(settings); } private static boolean isRemoteRoutingTableAttributePresent(Settings settings) { diff --git a/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java b/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java index 32cb95e0c04f6..59284041eb71c 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java @@ -48,6 +48,7 @@ import org.opensearch.gateway.GatewayMetaState.RemotePersistedState; import org.opensearch.gateway.remote.ClusterMetadataManifest; import org.opensearch.gateway.remote.RemoteClusterStateService; +import org.opensearch.gateway.remote.RemoteClusterStateSettings; import org.opensearch.gateway.remote.model.RemoteClusterStateManifestInfo; import org.opensearch.repositories.fs.FsRepository; import org.opensearch.test.EqualsHashCodeTestUtils; @@ -68,8 +69,8 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; import static org.opensearch.gateway.remote.ClusterMetadataManifest.MANIFEST_CURRENT_CODEC_VERSION; -import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING; -import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_PUBLICATION_SETTING_KEY; +import static org.opensearch.gateway.remote.RemoteClusterStateSettings.REMOTE_CLUSTER_STATE_ENABLED_SETTING; +import static org.opensearch.gateway.remote.RemoteClusterStateSettings.REMOTE_PUBLICATION_SETTING_KEY; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT; @@ -1394,7 +1395,7 @@ private static Settings remoteStateSettings() { .put("node.attr." + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, randomRepoName) .put(stateRepoTypeAttributeKey, FsRepository.TYPE) .put(stateRepoSettingsAttributeKeyPrefix + "location", "randomRepoPath") - .put(RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true) + .put(RemoteClusterStateSettings.REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true) .build(); return settings; } diff --git a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactoryTests.java b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactoryTests.java index 683942fd34a37..02b49a2638806 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactoryTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactoryTests.java @@ -10,6 +10,7 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.gateway.remote.RemoteClusterStateSettings; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.fs.FsRepository; import org.opensearch.test.OpenSearchTestCase; @@ -19,7 +20,7 @@ import java.util.function.Supplier; -import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_PUBLICATION_SETTING_KEY; +import static org.opensearch.gateway.remote.RemoteClusterStateSettings.REMOTE_PUBLICATION_SETTING_KEY; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY; public class RemoteRoutingTableServiceFactoryTests extends OpenSearchTestCase { @@ -39,6 +40,7 @@ public void testGetServiceWhenRemoteRoutingDisabled() { repositoriesService, settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + new RemoteClusterStateSettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)), threadPool, "test-cluster" ); @@ -55,6 +57,7 @@ public void testGetServiceWhenRemoteRoutingEnabled() { repositoriesService, settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + new RemoteClusterStateSettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)), threadPool, "test-cluster" ); diff --git a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java index 63501f878d55d..5cb9380bb0d6b 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java @@ -28,12 +28,14 @@ import org.opensearch.common.compress.DeflateCompressor; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.TestCapturingListener; import org.opensearch.core.action.ActionListener; import org.opensearch.core.compress.Compressor; import org.opensearch.core.compress.NoneCompressor; import org.opensearch.core.index.Index; import org.opensearch.gateway.remote.ClusterMetadataManifest; +import org.opensearch.gateway.remote.RemoteClusterStateSettings; import org.opensearch.gateway.remote.RemoteClusterStateUtils; import org.opensearch.index.remote.RemoteStoreEnums; import org.opensearch.index.remote.RemoteStorePathStrategy; @@ -63,8 +65,8 @@ import org.mockito.Mockito; import static org.opensearch.gateway.remote.ClusterMetadataManifestTests.randomUploadedIndexMetadataList; -import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_PUBLICATION_SETTING_KEY; import static org.opensearch.gateway.remote.RemoteClusterStateServiceTests.generateClusterStateWithOneIndex; +import static org.opensearch.gateway.remote.RemoteClusterStateSettings.REMOTE_PUBLICATION_SETTING_KEY; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.CLUSTER_STATE_PATH_TOKEN; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.PATH_DELIMITER; @@ -103,6 +105,7 @@ public class RemoteRoutingTableServiceTests extends OpenSearchTestCase { private ClusterService clusterService; private Compressor compressor; private BlobStoreTransferService blobStoreTransferService; + private RemoteClusterStateSettings remoteClusterStateSettings; private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); @Before @@ -129,10 +132,14 @@ public void setup() { compressor = new NoneCompressor(); basePath = BlobPath.cleanPath().add("base-path"); when(blobStoreRepository.basePath()).thenReturn(basePath); + remoteClusterStateSettings = mock(RemoteClusterStateSettings.class); + TimeValue maxJitter = TimeValue.timeValueMillis(100); + when(remoteClusterStateSettings.getRemoteStateReadMaxJitter()).thenReturn(maxJitter); remoteRoutingTableService = new InternalRemoteRoutingTableService( repositoriesServiceSupplier, settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + remoteClusterStateSettings, threadPool, "test-cluster" ); @@ -148,12 +155,17 @@ public void teardown() throws Exception { public void testFailInitializationWhenRemoteRoutingDisabled() { final Settings settings = Settings.builder().build(); + remoteClusterStateSettings = new RemoteClusterStateSettings( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) + ); assertThrows( AssertionError.class, () -> new InternalRemoteRoutingTableService( repositoriesServiceSupplier, settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + remoteClusterStateSettings, threadPool, "test-cluster" ) diff --git a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java index 5ea5241762753..a3def4eeeac30 100644 --- a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java +++ b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java @@ -68,6 +68,7 @@ import org.opensearch.gateway.PersistedClusterStateService.Writer; import org.opensearch.gateway.remote.ClusterMetadataManifest; import org.opensearch.gateway.remote.RemoteClusterStateService; +import org.opensearch.gateway.remote.RemoteClusterStateSettings; import org.opensearch.gateway.remote.RemoteUploadStats; import org.opensearch.gateway.remote.model.RemoteClusterStateManifestInfo; import org.opensearch.index.recovery.RemoteStoreRestoreService; @@ -1043,7 +1044,7 @@ public void testGatewayForRemoteState() throws IOException { .put("node.attr." + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, "randomRepoName") .put(stateRepoTypeAttributeKey, FsRepository.TYPE) .put(stateRepoSettingsAttributeKeyPrefix + "location", "randomRepoPath") - .put(RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true) + .put(RemoteClusterStateSettings.REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true) .build(); gateway.start(settings, nodeEnvironment, xContentRegistry(), persistedStateRegistry); @@ -1268,7 +1269,7 @@ private MockGatewayMetaState newGatewayForRemoteState( .put("node.attr." + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, randomRepoName) .put(stateRepoTypeAttributeKey, FsRepository.TYPE) .put(stateRepoSettingsAttributeKeyPrefix + "location", "randomRepoPath") - .put(RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true) + .put(RemoteClusterStateSettings.REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true) .build(); final TransportService transportService = mock(TransportService.class); ClusterService clusterService = mock(ClusterService.class); diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManagerTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManagerTests.java index 67b1528466a9e..f4e0aeac71f66 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManagerTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManagerTests.java @@ -16,8 +16,10 @@ import org.opensearch.cluster.block.ClusterBlocks; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.remote.RemoteWritableEntityStore; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.TestCapturingListener; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; @@ -42,6 +44,8 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; +import org.mockito.ArgumentCaptor; + import static org.opensearch.common.blobstore.stream.write.WritePriority.URGENT; import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.CLUSTER_STATE_ATTRIBUTE; import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION; @@ -71,6 +75,7 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class RemoteClusterStateAttributesManagerTests extends OpenSearchTestCase { @@ -93,6 +98,7 @@ public void setup() throws Exception { compressor = new NoneCompressor(); remoteClusterStateAttributesManager = new RemoteClusterStateAttributesManager( + new RemoteClusterStateSettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)), CLUSTER_NAME, blobStoreRepository, blobStoreTransferService, @@ -283,6 +289,48 @@ public void testGetAsyncReadRunnable_Custom() throws IOException, InterruptedExc assertEquals(CLUSTER_STATE_CUSTOM, capturingListener.getResult().getComponentName()); } + public void testRemoteClusterStateAttributesManager_ReadAsyncWithDelay() { + + RemoteClusterStateSettings remoteClusterStateSettings = mock(RemoteClusterStateSettings.class); + TimeValue maxJitter = TimeValue.timeValueMillis(100); + when(remoteClusterStateSettings.getRemoteStateReadMaxJitter()).thenReturn(maxJitter); + + remoteClusterStateAttributesManager = new RemoteClusterStateAttributesManager( + remoteClusterStateSettings, + "test-cluster", + mock(BlobStoreRepository.class), + blobStoreTransferService, + namedWriteableRegistry, + threadPool + ); + + RemoteWritableEntityStore mockRemoteWritableEntityStore = mock(RemoteWritableEntityStore.class); + remoteClusterStateAttributesManager.addRemoteWritableEntityStore( + RemoteDiscoveryNodes.DISCOVERY_NODES, + mockRemoteWritableEntityStore + ); + + RemoteDiscoveryNodes remoteDiscoveryNodes = mock(RemoteDiscoveryNodes.class); + when(remoteDiscoveryNodes.getType()).thenReturn(RemoteDiscoveryNodes.DISCOVERY_NODES); + + ActionListener listener = mock(ActionListener.class); + + ArgumentCaptor delayCaptor = ArgumentCaptor.forClass(Long.class); + + remoteClusterStateAttributesManager.readAsync("testComponent", remoteDiscoveryNodes, listener); + verify(mockRemoteWritableEntityStore).readAsyncWithDelay( + delayCaptor.capture(), + eq(remoteDiscoveryNodes), + any(ActionListener.class) + ); + + long capturedDelay = delayCaptor.getValue(); + + // assert + assertTrue("Delay should be non-negative", capturedDelay >= 0); + assertTrue("Delay should not exceed max jitter", capturedDelay < TimeValue.timeValueMillis(100).millis()); + } + public void testGetAsyncWriteRunnable_Exception() throws IOException, InterruptedException { DiscoveryNodes discoveryNodes = getDiscoveryNodes(); RemoteDiscoveryNodes remoteDiscoveryNodes = new RemoteDiscoveryNodes(discoveryNodes, VERSION, CLUSTER_UUID, compressor); diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerTests.java index 8e114c9a26534..916b07ac87bb8 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerTests.java @@ -121,7 +121,7 @@ public void setup() { .put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, "routing_repository") .put(stateRepoTypeAttributeKey, FsRepository.TYPE) .put(stateRepoSettingsAttributeKeyPrefix + "location", "randomRepoPath") - .put(RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true) + .put(RemoteClusterStateSettings.REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true) .build(); clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index 56857285fa8d3..13359daa477e7 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -45,6 +45,7 @@ import org.opensearch.common.remote.AbstractClusterMetadataWriteableBlobEntity; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.FeatureFlags; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.bytes.BytesArray; @@ -112,7 +113,7 @@ import static org.opensearch.gateway.remote.ClusterMetadataManifest.MANIFEST_CURRENT_CODEC_VERSION; import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.CLUSTER_BLOCKS; import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.CLUSTER_STATE_ATTRIBUTE; -import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_PUBLICATION_SETTING_KEY; +import static org.opensearch.gateway.remote.RemoteClusterStateSettings.REMOTE_PUBLICATION_SETTING_KEY; import static org.opensearch.gateway.remote.RemoteClusterStateTestUtils.CustomMetadata1; import static org.opensearch.gateway.remote.RemoteClusterStateTestUtils.CustomMetadata2; import static org.opensearch.gateway.remote.RemoteClusterStateTestUtils.CustomMetadata3; @@ -123,6 +124,7 @@ import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.FORMAT_PARAMS; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.getFormattedIndexFileName; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.getRandomDownloadJitterDelay; import static org.opensearch.gateway.remote.RemoteGlobalMetadataManager.GLOBAL_METADATA_UPLOAD_TIMEOUT_DEFAULT; import static org.opensearch.gateway.remote.model.RemoteClusterBlocks.CLUSTER_BLOCKS_FORMAT; import static org.opensearch.gateway.remote.model.RemoteClusterBlocksTests.randomClusterBlocks; @@ -220,7 +222,7 @@ public void setup() { .put("node.attr." + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, "remote_store_repository") .put(stateRepoTypeAttributeKey, FsRepository.TYPE) .put(stateRepoSettingsAttributeKeyPrefix + "location", "randomRepoPath") - .put(RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true) + .put(RemoteClusterStateSettings.REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true) .put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, "routing_repository") .build(); List writeableEntries = ClusterModule.getNamedWriteables(); @@ -2760,7 +2762,7 @@ public void testRemoteRoutingTableInitializedWhenEnabled() { Settings newSettings = Settings.builder() .put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, "routing_repository") .put("node.attr." + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, "remote_store_repository") - .put(RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true) + .put(RemoteClusterStateSettings.REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true) .build(); clusterSettings.applySettings(newSettings); @@ -3040,7 +3042,7 @@ private void initializeRoutingTable() { Settings newSettings = Settings.builder() .put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, "routing_repository") .put("node.attr." + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, "remote_store_repository") - .put(RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true) + .put(RemoteClusterStateSettings.REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true) .put(REMOTE_PUBLICATION_SETTING_KEY, "true") .build(); clusterSettings.applySettings(newSettings); @@ -3065,12 +3067,12 @@ private void initializeRoutingTable() { ); } - private void initializeWithChecksumEnabled(RemoteClusterStateService.RemoteClusterStateValidationMode mode) { + private void initializeWithChecksumEnabled(RemoteClusterStateSettings.RemoteClusterStateValidationMode mode) { Settings newSettings = Settings.builder() .put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, "routing_repository") .put("node.attr." + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, "remote_store_repository") - .put(RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true) - .put(RemoteClusterStateService.REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_MODE_SETTING.getKey(), mode.name()) + .put(RemoteClusterStateSettings.REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true) + .put(RemoteClusterStateSettings.REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_MODE_SETTING.getKey(), mode.name()) .put(REMOTE_PUBLICATION_SETTING_KEY, true) .build(); clusterSettings.applySettings(newSettings); @@ -3096,7 +3098,7 @@ private void initializeWithChecksumEnabled(RemoteClusterStateService.RemoteClust } public void testWriteFullMetadataSuccessWithChecksumValidationEnabled() throws IOException { - initializeWithChecksumEnabled(RemoteClusterStateService.RemoteClusterStateValidationMode.FAILURE); + initializeWithChecksumEnabled(RemoteClusterStateSettings.RemoteClusterStateValidationMode.FAILURE); mockBlobStoreObjects(); when((blobStoreRepository.basePath())).thenReturn(BlobPath.cleanPath().add("base-path")); @@ -3140,7 +3142,7 @@ public void testWriteFullMetadataSuccessWithChecksumValidationEnabled() throws I } public void testWriteFullMetadataSuccessWithChecksumValidationModeNone() throws IOException { - initializeWithChecksumEnabled(RemoteClusterStateService.RemoteClusterStateValidationMode.NONE); + initializeWithChecksumEnabled(RemoteClusterStateSettings.RemoteClusterStateValidationMode.NONE); mockBlobStoreObjects(); when((blobStoreRepository.basePath())).thenReturn(BlobPath.cleanPath().add("base-path")); @@ -3183,7 +3185,7 @@ public void testWriteFullMetadataSuccessWithChecksumValidationModeNone() throws } public void testWriteIncrementalMetadataSuccessWithChecksumValidationEnabled() throws IOException { - initializeWithChecksumEnabled(RemoteClusterStateService.RemoteClusterStateValidationMode.FAILURE); + initializeWithChecksumEnabled(RemoteClusterStateSettings.RemoteClusterStateValidationMode.FAILURE); final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); mockBlobStoreObjects(); final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder().term(1L).build(); @@ -3235,7 +3237,7 @@ public void testWriteIncrementalMetadataSuccessWithChecksumValidationEnabled() t } public void testWriteIncrementalMetadataSuccessWithChecksumValidationModeNone() throws IOException { - initializeWithChecksumEnabled(RemoteClusterStateService.RemoteClusterStateValidationMode.NONE); + initializeWithChecksumEnabled(RemoteClusterStateSettings.RemoteClusterStateValidationMode.NONE); final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); mockBlobStoreObjects(); final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder().term(1L).build(); @@ -3287,7 +3289,7 @@ public void testWriteIncrementalMetadataSuccessWithChecksumValidationModeNone() } public void testGetClusterStateForManifestWithChecksumValidationEnabledWithNullChecksum() throws IOException { - initializeWithChecksumEnabled(RemoteClusterStateService.RemoteClusterStateValidationMode.FAILURE); + initializeWithChecksumEnabled(RemoteClusterStateSettings.RemoteClusterStateValidationMode.FAILURE); ClusterMetadataManifest manifest = generateClusterMetadataManifestWithAllAttributes().build(); mockBlobStoreObjects(); remoteClusterStateService.start(); @@ -3346,7 +3348,7 @@ public void testGetClusterStateForManifestWithChecksumValidationEnabledWithNullC } public void testGetClusterStateForManifestWithChecksumValidationEnabled() throws IOException { - initializeWithChecksumEnabled(RemoteClusterStateService.RemoteClusterStateValidationMode.FAILURE); + initializeWithChecksumEnabled(RemoteClusterStateSettings.RemoteClusterStateValidationMode.FAILURE); ClusterState clusterState = generateClusterStateWithAllAttributes().build(); ClusterMetadataManifest manifest = generateClusterMetadataManifestWithAllAttributes().checksum( new ClusterStateChecksum(clusterState) @@ -3379,7 +3381,7 @@ public void testGetClusterStateForManifestWithChecksumValidationEnabled() throws } public void testGetClusterStateForManifestWithChecksumValidationModeNone() throws IOException { - initializeWithChecksumEnabled(RemoteClusterStateService.RemoteClusterStateValidationMode.NONE); + initializeWithChecksumEnabled(RemoteClusterStateSettings.RemoteClusterStateValidationMode.NONE); ClusterState clusterState = generateClusterStateWithAllAttributes().build(); ClusterMetadataManifest manifest = generateClusterMetadataManifestWithAllAttributes().checksum( new ClusterStateChecksum(clusterState) @@ -3412,7 +3414,7 @@ public void testGetClusterStateForManifestWithChecksumValidationModeNone() throw } public void testGetClusterStateForManifestWithChecksumValidationEnabledWithMismatch() throws IOException { - initializeWithChecksumEnabled(RemoteClusterStateService.RemoteClusterStateValidationMode.FAILURE); + initializeWithChecksumEnabled(RemoteClusterStateSettings.RemoteClusterStateValidationMode.FAILURE); ClusterState clusterState = generateClusterStateWithAllAttributes().build(); ClusterMetadataManifest manifest = generateClusterMetadataManifestWithAllAttributes().checksum( new ClusterStateChecksum(clusterState) @@ -3458,8 +3460,8 @@ public void testGetClusterStateForManifestWithChecksumValidationDebugWithMismatc initializeWithChecksumEnabled( randomFrom( Arrays.asList( - RemoteClusterStateService.RemoteClusterStateValidationMode.DEBUG, - RemoteClusterStateService.RemoteClusterStateValidationMode.TRACE + RemoteClusterStateSettings.RemoteClusterStateValidationMode.DEBUG, + RemoteClusterStateSettings.RemoteClusterStateValidationMode.TRACE ) ) ); @@ -3502,7 +3504,7 @@ public void testGetClusterStateForManifestWithChecksumValidationDebugWithMismatc } public void testGetClusterStateUsingDiffWithChecksum() throws IOException { - initializeWithChecksumEnabled(RemoteClusterStateService.RemoteClusterStateValidationMode.FAILURE); + initializeWithChecksumEnabled(RemoteClusterStateSettings.RemoteClusterStateValidationMode.FAILURE); ClusterState clusterState = generateClusterStateWithAllAttributes().build(); ClusterMetadataManifest manifest = generateClusterMetadataManifestWithAllAttributes().checksum( new ClusterStateChecksum(clusterState) @@ -3544,7 +3546,7 @@ public void testGetClusterStateUsingDiffWithChecksum() throws IOException { } public void testGetClusterStateUsingDiffWithChecksumModeNone() throws IOException { - initializeWithChecksumEnabled(RemoteClusterStateService.RemoteClusterStateValidationMode.NONE); + initializeWithChecksumEnabled(RemoteClusterStateSettings.RemoteClusterStateValidationMode.NONE); ClusterState clusterState = generateClusterStateWithAllAttributes().build(); ClusterMetadataManifest manifest = generateClusterMetadataManifestWithAllAttributes().checksum( new ClusterStateChecksum(clusterState) @@ -3586,7 +3588,7 @@ public void testGetClusterStateUsingDiffWithChecksumModeNone() throws IOExceptio } public void testGetClusterStateUsingDiffWithChecksumModeDebugMismatch() throws IOException { - initializeWithChecksumEnabled(RemoteClusterStateService.RemoteClusterStateValidationMode.DEBUG); + initializeWithChecksumEnabled(RemoteClusterStateSettings.RemoteClusterStateValidationMode.DEBUG); ClusterState clusterState = generateClusterStateWithAllAttributes().build(); ClusterMetadataManifest manifest = generateClusterMetadataManifestWithAllAttributes().checksum( new ClusterStateChecksum(clusterState) @@ -3627,7 +3629,7 @@ public void testGetClusterStateUsingDiffWithChecksumModeDebugMismatch() throws I } public void testGetClusterStateUsingDiffWithChecksumModeTraceMismatch() throws IOException { - initializeWithChecksumEnabled(RemoteClusterStateService.RemoteClusterStateValidationMode.TRACE); + initializeWithChecksumEnabled(RemoteClusterStateSettings.RemoteClusterStateValidationMode.TRACE); ClusterState clusterState = generateClusterStateWithAllAttributes().build(); ClusterMetadataManifest manifest = generateClusterMetadataManifestWithAllAttributes().checksum( new ClusterStateChecksum(clusterState) @@ -3689,7 +3691,7 @@ public void testGetClusterStateUsingDiffWithChecksumModeTraceMismatch() throws I } public void testGetClusterStateUsingDiffWithChecksumMismatch() throws IOException { - initializeWithChecksumEnabled(RemoteClusterStateService.RemoteClusterStateValidationMode.FAILURE); + initializeWithChecksumEnabled(RemoteClusterStateSettings.RemoteClusterStateValidationMode.FAILURE); ClusterState clusterState = generateClusterStateWithAllAttributes().build(); ClusterMetadataManifest manifest = generateClusterMetadataManifestWithAllAttributes().checksum( new ClusterStateChecksum(clusterState) @@ -3750,6 +3752,17 @@ public void testGetClusterStateUsingDiffWithChecksumMismatch() throws IOExceptio assertEquals(1, remoteClusterStateService.getRemoteStateStats().getStateDiffDownloadValidationFailed()); } + public void testRemoteClusterStateUtils_getRandomDownloadJitterDelay() { + RemoteClusterStateSettings remoteClusterStateSettings = Mockito.mock(RemoteClusterStateSettings.class); + long maxDelayMillis = 1000; + when(remoteClusterStateSettings.getRemoteStateReadMaxJitter()).thenReturn(TimeValue.timeValueMillis(maxDelayMillis)); + + long result = getRandomDownloadJitterDelay(remoteClusterStateSettings); + + assertTrue("Result should be positive", result > 0); + assertTrue("Result should not exceed max delay", result <= maxDelayMillis); + } + private void mockObjectsForGettingPreviousClusterUUID(Map clusterUUIDsPointers) throws IOException { mockObjectsForGettingPreviousClusterUUID(clusterUUIDsPointers, false, Collections.emptyMap()); } diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManagerTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManagerTests.java index a2da1e8b0fdb2..d66cbc7cff384 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManagerTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManagerTests.java @@ -20,8 +20,10 @@ import org.opensearch.cluster.metadata.TemplatesMetadata; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.network.NetworkModule; +import org.opensearch.common.remote.RemoteWritableEntityStore; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.TestCapturingListener; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; @@ -54,6 +56,8 @@ import java.util.function.Function; import java.util.stream.Stream; +import org.mockito.ArgumentCaptor; + import static java.util.stream.Collectors.toList; import static org.opensearch.cluster.metadata.Metadata.isGlobalStateEquals; import static org.opensearch.common.blobstore.stream.write.WritePriority.URGENT; @@ -93,6 +97,7 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class RemoteGlobalMetadataManagerTests extends OpenSearchTestCase { @@ -128,6 +133,7 @@ public void setup() { when(blobStoreRepository.basePath()).thenReturn(blobPath); remoteGlobalMetadataManager = new RemoteGlobalMetadataManager( clusterSettings, + new RemoteClusterStateSettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)), CLUSTER_NAME, blobStoreRepository, blobStoreTransferService, @@ -350,6 +356,33 @@ public void testGetAsyncWriteRunnable_TransientSettings() throws Exception { assertEquals(GLOBAL_METADATA_CURRENT_CODEC_VERSION, Integer.parseInt(splitFileName[3])); } + public void testRemoteIndexMetadataManager_ReadAsyncWithDelay() { + RemoteClusterStateSettings remoteClusterStateSettings = mock(RemoteClusterStateSettings.class); + TimeValue maxJitter = TimeValue.timeValueMillis(100); + when(remoteClusterStateSettings.getRemoteStateReadMaxJitter()).thenReturn(maxJitter); + remoteGlobalMetadataManager = new RemoteGlobalMetadataManager( + clusterSettings, + remoteClusterStateSettings, + CLUSTER_NAME, + blobStoreRepository, + blobStoreTransferService, + writableRegistry(), + threadPool + ); + RemoteGlobalMetadata remoteEntity = mock(RemoteGlobalMetadata.class); + when(remoteEntity.getType()).thenReturn(GLOBAL_METADATA); + ActionListener listener = mock(ActionListener.class); + ArgumentCaptor delayCaptor = ArgumentCaptor.forClass(Long.class); + RemoteWritableEntityStore mockRemoteWritableEntityStore = mock(RemoteWritableEntityStore.class); + remoteGlobalMetadataManager.addRemoteWritableEntityStore(GLOBAL_METADATA, mockRemoteWritableEntityStore); + remoteGlobalMetadataManager.readAsync("testComponent", remoteEntity, listener); + + verify(mockRemoteWritableEntityStore).readAsyncWithDelay(delayCaptor.capture(), eq(remoteEntity), any(ActionListener.class)); + long capturedDelay = delayCaptor.getValue(); + assertTrue("Delay should be non-negative", capturedDelay >= 0); + assertTrue("Delay should not exceed max jitter", capturedDelay < TimeValue.timeValueMillis(100).millis()); + } + public void testGetAsyncReadRunnable_HashesOfConsistentSettings() throws Exception { DiffableStringMap hashesOfConsistentSettings = getHashesOfConsistentSettings(); String fileName = randomAlphaOfLength(10); diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteIndexMetadataManagerTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteIndexMetadataManagerTests.java index ac7f9b655e877..2491d8ab6ac9e 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteIndexMetadataManagerTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteIndexMetadataManagerTests.java @@ -18,8 +18,10 @@ import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStore; import org.opensearch.common.blobstore.stream.write.WritePriority; +import org.opensearch.common.remote.RemoteWritableEntityStore; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.TestCapturingListener; import org.opensearch.core.action.ActionListener; import org.opensearch.core.compress.Compressor; @@ -39,6 +41,8 @@ import java.io.IOException; import java.util.concurrent.CountDownLatch; +import org.mockito.ArgumentCaptor; + import static org.opensearch.gateway.remote.RemoteClusterStateService.FORMAT_PARAMS; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.PATH_DELIMITER; @@ -53,6 +57,7 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class RemoteIndexMetadataManagerTests extends OpenSearchTestCase { @@ -75,6 +80,7 @@ public void setup() { when(blobStoreRepository.getCompressor()).thenReturn(compressor); remoteIndexMetadataManager = new RemoteIndexMetadataManager( clusterSettings, + new RemoteClusterStateSettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)), "test-cluster", blobStoreRepository, blobStoreTransferService, @@ -145,6 +151,41 @@ public void testGetAsyncWriteRunnable_IOFailure() throws Exception { assertTrue(listener.getFailure() instanceof RemoteStateTransferException); } + public void testRemoteIndexMetadataManager_ReadAsync() { + RemoteClusterStateSettings remoteClusterStateSettings = mock(RemoteClusterStateSettings.class); + TimeValue maxJitter = TimeValue.timeValueMillis(100); + when(remoteClusterStateSettings.getRemoteStateReadMaxJitter()).thenReturn(maxJitter); + + remoteIndexMetadataManager = new RemoteIndexMetadataManager( + clusterSettings, + remoteClusterStateSettings, + "test-cluster", + blobStoreRepository, + blobStoreTransferService, + threadPool + ); + + RemoteWritableEntityStore mockRemoteWritableEntityStore = mock(RemoteWritableEntityStore.class); + remoteIndexMetadataManager.addRemoteWritableEntityStore(RemoteIndexMetadata.INDEX, mockRemoteWritableEntityStore); + + RemoteIndexMetadata remoteEntity = mock(RemoteIndexMetadata.class); + when(remoteEntity.getType()).thenReturn(RemoteIndexMetadata.INDEX); + + ActionListener listener = mock(ActionListener.class); + + ArgumentCaptor delayCaptor = ArgumentCaptor.forClass(Long.class); + + remoteIndexMetadataManager.readAsync("testComponent", remoteEntity, listener); + verify(mockRemoteWritableEntityStore).readAsyncWithDelay(delayCaptor.capture(), eq(remoteEntity), any(ActionListener.class)); + + long capturedDelay = delayCaptor.getValue(); + + // Assert that the delay is within the expected range (0 to max jitter) + assertTrue("Delay should be non-negative", capturedDelay >= 0); + assertTrue("Delay should not exceed max jitter", capturedDelay < TimeValue.timeValueMillis(100).millis()); + + } + public void testGetAsyncReadRunnable_Success() throws Exception { IndexMetadata indexMetadata = getIndexMetadata(randomAlphaOfLength(10), randomBoolean(), randomAlphaOfLength(10)); String fileName = randomAlphaOfLength(10); diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteIndexPathUploaderTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteIndexPathUploaderTests.java index d6519d9db8ee6..9519fe9741e1d 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteIndexPathUploaderTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteIndexPathUploaderTests.java @@ -21,7 +21,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.action.ActionListener; -import org.opensearch.gateway.remote.RemoteClusterStateService; +import org.opensearch.gateway.remote.RemoteClusterStateSettings; import org.opensearch.gateway.remote.RemoteStateTransferException; import org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm; import org.opensearch.index.remote.RemoteStoreEnums.PathType; @@ -86,7 +86,7 @@ public void setup() { .put(RemoteIndexPathUploader.TRANSLOG_REPO_NAME_KEY, TRANSLOG_REPO_NAME) .put(RemoteIndexPathUploader.SEGMENT_REPO_NAME_KEY, TRANSLOG_REPO_NAME) .put(CLUSTER_STATE_REPO_KEY, TRANSLOG_REPO_NAME) - .put(RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true) + .put(RemoteClusterStateSettings.REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true) .build(); clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); basePath = BlobPath.cleanPath().add("test"); diff --git a/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryRemoteIndexTests.java b/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryRemoteIndexTests.java index e280ab8c7a73c..f234c92e051d0 100644 --- a/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryRemoteIndexTests.java +++ b/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryRemoteIndexTests.java @@ -40,7 +40,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.core.index.shard.ShardId; import org.opensearch.env.Environment; -import org.opensearch.gateway.remote.RemoteClusterStateService; +import org.opensearch.gateway.remote.RemoteClusterStateSettings; import org.opensearch.index.IndexSettings; import org.opensearch.index.snapshots.blobstore.RemoteStoreShardShallowCopySnapshot; import org.opensearch.indices.RemoteStoreSettings; @@ -115,7 +115,7 @@ private Settings buildRemoteStoreNodeAttributes(String repoName, Path repoPath) .put("node.attr." + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, repoName) .put(repoTypeAttributeKey, FsRepository.TYPE) .put(repoSettingsAttributeKeyPrefix + "location", repoPath) - .put(RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), false) + .put(RemoteClusterStateSettings.REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), false) .build(); } diff --git a/test/framework/src/main/java/org/opensearch/test/ParameterizedStaticSettingsOpenSearchIntegTestCase.java b/test/framework/src/main/java/org/opensearch/test/ParameterizedStaticSettingsOpenSearchIntegTestCase.java index 7d2c9ad686a01..703b80d09bd55 100644 --- a/test/framework/src/main/java/org/opensearch/test/ParameterizedStaticSettingsOpenSearchIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/ParameterizedStaticSettingsOpenSearchIntegTestCase.java @@ -16,7 +16,7 @@ import java.util.List; import java.util.Objects; -import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING; +import static org.opensearch.gateway.remote.RemoteClusterStateSettings.REMOTE_CLUSTER_STATE_ENABLED_SETTING; import static org.opensearch.indices.IndicesService.CLUSTER_REPLICATION_TYPE_SETTING; /**