Skip to content

Commit

Permalink
Remove S3 cooldown period setting
Browse files Browse the repository at this point in the history
Signed-off-by: Bhumika Saini <[email protected]>
  • Loading branch information
Bhumika Saini committed Oct 5, 2023
1 parent 890663f commit a88734b
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 150 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@

import software.amazon.awssdk.core.internal.http.pipeline.stages.ApplyTransactionIdStage;

import org.opensearch.action.ActionRunnable;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.SuppressForbidden;
Expand All @@ -49,26 +47,18 @@
import org.opensearch.common.settings.MockSecureSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.plugins.Plugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.RepositoryData;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.repositories.blobstore.OpenSearchMockAPIBasedRepositoryIntegTestCase;
import org.opensearch.repositories.s3.utils.AwsRequestSigner;
import org.opensearch.snapshots.SnapshotId;
import org.opensearch.snapshots.SnapshotsService;
import org.opensearch.snapshots.mockstore.BlobStoreWrapper;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -79,16 +69,11 @@
import fixture.s3.S3HttpHandler;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThan;

@SuppressForbidden(reason = "this test uses a HttpServer to emulate an S3 endpoint")
// Need to set up a new cluster for each test because cluster settings use randomized authentication settings
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST)
public class S3BlobStoreRepositoryTests extends OpenSearchMockAPIBasedRepositoryIntegTestCase {

private static final TimeValue TEST_COOLDOWN_PERIOD = TimeValue.timeValueSeconds(10L);

private final String region = "test-region";
private String signerOverride;
private String previousOpenSearchPathConf;
Expand Down Expand Up @@ -166,56 +151,6 @@ protected Settings nodeSettings(int nodeOrdinal) {
return builder.build();
}

public void testEnforcedCooldownPeriod() throws IOException {
final String repoName = createRepository(
randomName(),
Settings.builder().put(repositorySettings()).put(S3Repository.COOLDOWN_PERIOD.getKey(), TEST_COOLDOWN_PERIOD).build()
);

final SnapshotId fakeOldSnapshot = client().admin()
.cluster()
.prepareCreateSnapshot(repoName, "snapshot-old")
.setWaitForCompletion(true)
.setIndices()
.get()
.getSnapshotInfo()
.snapshotId();
final RepositoriesService repositoriesService = internalCluster().getCurrentClusterManagerNodeInstance(RepositoriesService.class);
final BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(repoName);
final RepositoryData repositoryData = getRepositoryData(repository);
final RepositoryData modifiedRepositoryData = repositoryData.withVersions(
Collections.singletonMap(fakeOldSnapshot, SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION.minimumCompatibilityVersion())
);
final BytesReference serialized = BytesReference.bytes(
modifiedRepositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), SnapshotsService.OLD_SNAPSHOT_FORMAT)
);
PlainActionFuture.get(f -> repository.threadPool().generic().execute(ActionRunnable.run(f, () -> {
try (InputStream stream = serialized.streamInput()) {
repository.blobStore()
.blobContainer(repository.basePath())
.writeBlobAtomic(
BlobStoreRepository.INDEX_FILE_PREFIX + modifiedRepositoryData.getGenId(),
stream,
serialized.length(),
true
);
}
})));

final String newSnapshotName = "snapshot-new";
final long beforeThrottledSnapshot = repository.threadPool().relativeTimeInNanos();
client().admin().cluster().prepareCreateSnapshot(repoName, newSnapshotName).setWaitForCompletion(true).setIndices().get();
assertThat(repository.threadPool().relativeTimeInNanos() - beforeThrottledSnapshot, greaterThan(TEST_COOLDOWN_PERIOD.getNanos()));

final long beforeThrottledDelete = repository.threadPool().relativeTimeInNanos();
client().admin().cluster().prepareDeleteSnapshot(repoName, newSnapshotName).get();
assertThat(repository.threadPool().relativeTimeInNanos() - beforeThrottledDelete, greaterThan(TEST_COOLDOWN_PERIOD.getNanos()));

final long beforeFastDelete = repository.threadPool().relativeTimeInNanos();
client().admin().cluster().prepareDeleteSnapshot(repoName, fakeOldSnapshot.getName()).get();
assertThat(repository.threadPool().relativeTimeInNanos() - beforeFastDelete, lessThan(TEST_COOLDOWN_PERIOD.getNanos()));
}

/**
* S3RepositoryPlugin that allows to disable chunked encoding and to set a low threshold between single upload and multipart upload.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.LegacyESVersion;
import org.opensearch.Version;
import org.opensearch.action.ActionRunnable;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.metadata.RepositoryMetadata;
Expand All @@ -51,7 +49,6 @@
import org.opensearch.common.settings.SecureSetting;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.settings.SecureString;
Expand All @@ -68,17 +65,14 @@
import org.opensearch.repositories.s3.async.AsyncTransferManager;
import org.opensearch.snapshots.SnapshotId;
import org.opensearch.snapshots.SnapshotInfo;
import org.opensearch.snapshots.SnapshotsService;
import org.opensearch.threadpool.Scheduler;
import org.opensearch.threadpool.ThreadPool;

import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

Expand Down Expand Up @@ -210,24 +204,6 @@ class S3Repository extends MeteredBlobStoreRepository {

static final Setting<String> CLIENT_NAME = new Setting<>("client", "default", Function.identity());

/**
* Artificial delay to introduce after a snapshot finalization or delete has finished so long as the repository is still using the
* backwards compatible snapshot format from before
* {@link org.opensearch.snapshots.SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION} ({@link LegacyESVersion#V_7_6_0}).
* This delay is necessary so that the eventually consistent nature of AWS S3 does not randomly result in repository corruption when
* doing repository operations in rapid succession on a repository in the old metadata format.
* This setting should not be adjusted in production when working with an AWS S3 backed repository. Doing so risks the repository
* becoming silently corrupted. To get rid of this waiting period, either create a new S3 repository or remove all snapshots older than
* {@link LegacyESVersion#V_7_6_0} from the repository which will trigger an upgrade of the repository metadata to the new
* format and disable the cooldown period.
*/
static final Setting<TimeValue> COOLDOWN_PERIOD = Setting.timeSetting(
"cooldown_period",
new TimeValue(3, TimeUnit.MINUTES),
new TimeValue(0, TimeUnit.MILLISECONDS),
Setting.Property.Dynamic
);

/**
* Specifies the path within bucket to repository data. Defaults to root directory.
*/
Expand All @@ -249,12 +225,6 @@ class S3Repository extends MeteredBlobStoreRepository {

private volatile String cannedACL;

/**
* Time period to delay repository operations by after finalizing or deleting a snapshot.
* See {@link #COOLDOWN_PERIOD} for details.
*/
private final TimeValue coolDown;

private final AsyncTransferManager asyncUploadUtils;
private final S3AsyncService s3AsyncService;
private final boolean multipartUploadEnabled;
Expand Down Expand Up @@ -317,8 +287,6 @@ class S3Repository extends MeteredBlobStoreRepository {

validateRepositoryMetadata(metadata);
readRepositoryMetadata();

coolDown = COOLDOWN_PERIOD.get(metadata.settings());
}

private static Map<String, String> buildLocation(RepositoryMetadata metadata) {
Expand All @@ -341,9 +309,6 @@ public void finalizeSnapshot(
Function<ClusterState, ClusterState> stateTransformer,
ActionListener<RepositoryData> listener
) {
if (SnapshotsService.useShardGenerations(repositoryMetaVersion) == false) {
listener = delayedListener(listener);
}
super.finalizeSnapshot(
shardGenerations,
repositoryStateId,
Expand All @@ -362,59 +327,9 @@ public void deleteSnapshots(
Version repositoryMetaVersion,
ActionListener<RepositoryData> listener
) {
if (SnapshotsService.useShardGenerations(repositoryMetaVersion) == false) {
listener = delayedListener(listener);
}
super.deleteSnapshots(snapshotIds, repositoryStateId, repositoryMetaVersion, listener);
}

/**
* Wraps given listener such that it is executed with a delay of {@link #coolDown} on the snapshot thread-pool after being invoked.
* See {@link #COOLDOWN_PERIOD} for details.
*/
private <T> ActionListener<T> delayedListener(ActionListener<T> listener) {
final ActionListener<T> wrappedListener = ActionListener.runBefore(listener, () -> {
final Scheduler.Cancellable cancellable = finalizationFuture.getAndSet(null);
assert cancellable != null;
});
return new ActionListener<T>() {
@Override
public void onResponse(T response) {
logCooldownInfo();
final Scheduler.Cancellable existing = finalizationFuture.getAndSet(
threadPool.schedule(
ActionRunnable.wrap(wrappedListener, l -> l.onResponse(response)),
coolDown,
ThreadPool.Names.SNAPSHOT
)
);
assert existing == null : "Already have an ongoing finalization " + finalizationFuture;
}

@Override
public void onFailure(Exception e) {
logCooldownInfo();
final Scheduler.Cancellable existing = finalizationFuture.getAndSet(
threadPool.schedule(ActionRunnable.wrap(wrappedListener, l -> l.onFailure(e)), coolDown, ThreadPool.Names.SNAPSHOT)
);
assert existing == null : "Already have an ongoing finalization " + finalizationFuture;
}
};
}

private void logCooldownInfo() {
logger.info(
"Sleeping for [{}] after modifying repository [{}] because it contains snapshots older than version [{}]"
+ " and therefore is using a backwards compatible metadata format that requires this cooldown period to avoid "
+ "repository corruption. To get rid of this message and move to the new repository metadata format, either remove "
+ "all snapshots older than version [{}] from the repository or create a new repository at an empty location.",
coolDown,
metadata.name(),
SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION,
SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION
);
}

@Override
protected S3BlobStore createBlobStore() {
return new S3BlobStore(
Expand Down

0 comments on commit a88734b

Please sign in to comment.