diff --git a/plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3BlobStoreRepositoryTests.java b/plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3BlobStoreRepositoryTests.java index 4bd67e66ebcbd..76e5afcb882e7 100644 --- a/plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3BlobStoreRepositoryTests.java +++ b/plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3BlobStoreRepositoryTests.java @@ -37,8 +37,6 @@ import software.amazon.awssdk.core.internal.http.pipeline.stages.ApplyTransactionIdStage; -import org.opensearch.action.ActionRunnable; -import org.opensearch.action.support.PlainActionFuture; import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.SuppressForbidden; @@ -49,26 +47,18 @@ import org.opensearch.common.settings.MockSecureSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; -import org.opensearch.common.unit.TimeValue; -import org.opensearch.common.xcontent.XContentFactory; -import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.plugins.Plugin; -import org.opensearch.repositories.RepositoriesService; -import org.opensearch.repositories.RepositoryData; import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.repositories.blobstore.OpenSearchMockAPIBasedRepositoryIntegTestCase; import org.opensearch.repositories.s3.utils.AwsRequestSigner; -import org.opensearch.snapshots.SnapshotId; -import org.opensearch.snapshots.SnapshotsService; import org.opensearch.snapshots.mockstore.BlobStoreWrapper; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.threadpool.ThreadPool; import java.io.IOException; -import java.io.InputStream; import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; @@ -79,16 +69,11 @@ import fixture.s3.S3HttpHandler; import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.lessThan; @SuppressForbidden(reason = "this test uses a HttpServer to emulate an S3 endpoint") // Need to set up a new cluster for each test because cluster settings use randomized authentication settings @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST) public class S3BlobStoreRepositoryTests extends OpenSearchMockAPIBasedRepositoryIntegTestCase { - - private static final TimeValue TEST_COOLDOWN_PERIOD = TimeValue.timeValueSeconds(10L); - private final String region = "test-region"; private String signerOverride; private String previousOpenSearchPathConf; @@ -166,56 +151,6 @@ protected Settings nodeSettings(int nodeOrdinal) { return builder.build(); } - public void testEnforcedCooldownPeriod() throws IOException { - final String repoName = createRepository( - randomName(), - Settings.builder().put(repositorySettings()).put(S3Repository.COOLDOWN_PERIOD.getKey(), TEST_COOLDOWN_PERIOD).build() - ); - - final SnapshotId fakeOldSnapshot = client().admin() - .cluster() - .prepareCreateSnapshot(repoName, "snapshot-old") - .setWaitForCompletion(true) - .setIndices() - .get() - .getSnapshotInfo() - .snapshotId(); - final RepositoriesService repositoriesService = internalCluster().getCurrentClusterManagerNodeInstance(RepositoriesService.class); - final BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(repoName); - final RepositoryData repositoryData = getRepositoryData(repository); - final RepositoryData modifiedRepositoryData = repositoryData.withVersions( - Collections.singletonMap(fakeOldSnapshot, SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION.minimumCompatibilityVersion()) - ); - final BytesReference serialized = BytesReference.bytes( - modifiedRepositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), SnapshotsService.OLD_SNAPSHOT_FORMAT) - ); - PlainActionFuture.get(f -> repository.threadPool().generic().execute(ActionRunnable.run(f, () -> { - try (InputStream stream = serialized.streamInput()) { - repository.blobStore() - .blobContainer(repository.basePath()) - .writeBlobAtomic( - BlobStoreRepository.INDEX_FILE_PREFIX + modifiedRepositoryData.getGenId(), - stream, - serialized.length(), - true - ); - } - }))); - - final String newSnapshotName = "snapshot-new"; - final long beforeThrottledSnapshot = repository.threadPool().relativeTimeInNanos(); - client().admin().cluster().prepareCreateSnapshot(repoName, newSnapshotName).setWaitForCompletion(true).setIndices().get(); - assertThat(repository.threadPool().relativeTimeInNanos() - beforeThrottledSnapshot, greaterThan(TEST_COOLDOWN_PERIOD.getNanos())); - - final long beforeThrottledDelete = repository.threadPool().relativeTimeInNanos(); - client().admin().cluster().prepareDeleteSnapshot(repoName, newSnapshotName).get(); - assertThat(repository.threadPool().relativeTimeInNanos() - beforeThrottledDelete, greaterThan(TEST_COOLDOWN_PERIOD.getNanos())); - - final long beforeFastDelete = repository.threadPool().relativeTimeInNanos(); - client().admin().cluster().prepareDeleteSnapshot(repoName, fakeOldSnapshot.getName()).get(); - assertThat(repository.threadPool().relativeTimeInNanos() - beforeFastDelete, lessThan(TEST_COOLDOWN_PERIOD.getNanos())); - } - /** * S3RepositoryPlugin that allows to disable chunked encoding and to set a low threshold between single upload and multipart upload. */ diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java index 0e311c9419b24..01638419ccd3d 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java @@ -37,9 +37,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.opensearch.LegacyESVersion; import org.opensearch.Version; -import org.opensearch.action.ActionRunnable; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.RepositoryMetadata; @@ -51,7 +49,6 @@ import org.opensearch.common.settings.SecureSetting; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; -import org.opensearch.common.unit.TimeValue; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.Strings; import org.opensearch.core.common.settings.SecureString; @@ -68,9 +65,7 @@ import org.opensearch.repositories.s3.async.AsyncTransferManager; import org.opensearch.snapshots.SnapshotId; import org.opensearch.snapshots.SnapshotInfo; -import org.opensearch.snapshots.SnapshotsService; import org.opensearch.threadpool.Scheduler; -import org.opensearch.threadpool.ThreadPool; import java.nio.file.Path; import java.util.ArrayList; @@ -78,7 +73,6 @@ import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; @@ -210,24 +204,6 @@ class S3Repository extends MeteredBlobStoreRepository { static final Setting CLIENT_NAME = new Setting<>("client", "default", Function.identity()); - /** - * Artificial delay to introduce after a snapshot finalization or delete has finished so long as the repository is still using the - * backwards compatible snapshot format from before - * {@link org.opensearch.snapshots.SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION} ({@link LegacyESVersion#V_7_6_0}). - * This delay is necessary so that the eventually consistent nature of AWS S3 does not randomly result in repository corruption when - * doing repository operations in rapid succession on a repository in the old metadata format. - * This setting should not be adjusted in production when working with an AWS S3 backed repository. Doing so risks the repository - * becoming silently corrupted. To get rid of this waiting period, either create a new S3 repository or remove all snapshots older than - * {@link LegacyESVersion#V_7_6_0} from the repository which will trigger an upgrade of the repository metadata to the new - * format and disable the cooldown period. - */ - static final Setting COOLDOWN_PERIOD = Setting.timeSetting( - "cooldown_period", - new TimeValue(3, TimeUnit.MINUTES), - new TimeValue(0, TimeUnit.MILLISECONDS), - Setting.Property.Dynamic - ); - /** * Specifies the path within bucket to repository data. Defaults to root directory. */ @@ -249,12 +225,6 @@ class S3Repository extends MeteredBlobStoreRepository { private volatile String cannedACL; - /** - * Time period to delay repository operations by after finalizing or deleting a snapshot. - * See {@link #COOLDOWN_PERIOD} for details. - */ - private final TimeValue coolDown; - private final AsyncTransferManager asyncUploadUtils; private final S3AsyncService s3AsyncService; private final boolean multipartUploadEnabled; @@ -317,8 +287,6 @@ class S3Repository extends MeteredBlobStoreRepository { validateRepositoryMetadata(metadata); readRepositoryMetadata(); - - coolDown = COOLDOWN_PERIOD.get(metadata.settings()); } private static Map buildLocation(RepositoryMetadata metadata) { @@ -341,9 +309,6 @@ public void finalizeSnapshot( Function stateTransformer, ActionListener listener ) { - if (SnapshotsService.useShardGenerations(repositoryMetaVersion) == false) { - listener = delayedListener(listener); - } super.finalizeSnapshot( shardGenerations, repositoryStateId, @@ -362,59 +327,9 @@ public void deleteSnapshots( Version repositoryMetaVersion, ActionListener listener ) { - if (SnapshotsService.useShardGenerations(repositoryMetaVersion) == false) { - listener = delayedListener(listener); - } super.deleteSnapshots(snapshotIds, repositoryStateId, repositoryMetaVersion, listener); } - /** - * Wraps given listener such that it is executed with a delay of {@link #coolDown} on the snapshot thread-pool after being invoked. - * See {@link #COOLDOWN_PERIOD} for details. - */ - private ActionListener delayedListener(ActionListener listener) { - final ActionListener wrappedListener = ActionListener.runBefore(listener, () -> { - final Scheduler.Cancellable cancellable = finalizationFuture.getAndSet(null); - assert cancellable != null; - }); - return new ActionListener() { - @Override - public void onResponse(T response) { - logCooldownInfo(); - final Scheduler.Cancellable existing = finalizationFuture.getAndSet( - threadPool.schedule( - ActionRunnable.wrap(wrappedListener, l -> l.onResponse(response)), - coolDown, - ThreadPool.Names.SNAPSHOT - ) - ); - assert existing == null : "Already have an ongoing finalization " + finalizationFuture; - } - - @Override - public void onFailure(Exception e) { - logCooldownInfo(); - final Scheduler.Cancellable existing = finalizationFuture.getAndSet( - threadPool.schedule(ActionRunnable.wrap(wrappedListener, l -> l.onFailure(e)), coolDown, ThreadPool.Names.SNAPSHOT) - ); - assert existing == null : "Already have an ongoing finalization " + finalizationFuture; - } - }; - } - - private void logCooldownInfo() { - logger.info( - "Sleeping for [{}] after modifying repository [{}] because it contains snapshots older than version [{}]" - + " and therefore is using a backwards compatible metadata format that requires this cooldown period to avoid " - + "repository corruption. To get rid of this message and move to the new repository metadata format, either remove " - + "all snapshots older than version [{}] from the repository or create a new repository at an empty location.", - coolDown, - metadata.name(), - SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION, - SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION - ); - } - @Override protected S3BlobStore createBlobStore() { return new S3BlobStore(