Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Support pinned timestamp in delete flow #15722

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.opensearch.common.inject.Inject;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory;
import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.repositories.RepositoriesService;
Expand Down Expand Up @@ -101,6 +102,8 @@

private final RemoteStoreLockManagerFactory remoteStoreLockManagerFactory;

private final RemoteSegmentStoreDirectoryFactory remoteSegmentStoreDirectoryFactory;

@Override
protected String executor() {
return ThreadPool.Names.SAME;
Expand Down Expand Up @@ -128,6 +131,11 @@
);
this.repositoriesService = repositoriesService;
this.snapshotsService = snapshotsService;
this.remoteSegmentStoreDirectoryFactory = new RemoteSegmentStoreDirectoryFactory(
() -> repositoriesService,

Check warning on line 135 in server/src/main/java/org/opensearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java#L135

Added line #L135 was not covered by tests
threadPool,
remoteStoreSettings.getSegmentsPathFixedPrefix()
);
this.remoteStoreLockManagerFactory = new RemoteStoreLockManagerFactory(
() -> repositoriesService,
remoteStoreSettings.getSegmentsPathFixedPrefix()
Expand Down Expand Up @@ -291,6 +299,7 @@
repositoryStateId,
snapshotsService.minCompatibleVersion(newState.nodes().getMinNodeVersion(), repositoryData, null),
remoteStoreLockManagerFactory,
remoteSegmentStoreDirectoryFactory,
ActionListener.wrap(result -> after(null, result), e -> after(e, null))
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobMetadata;
import org.opensearch.common.collect.Tuple;
Expand Down Expand Up @@ -42,6 +43,7 @@
*
* @opensearch.internal
*/
@ExperimentalApi
public class RemoteStorePinnedTimestampService implements Closeable {
private static final Logger logger = LogManager.getLogger(RemoteStorePinnedTimestampService.class);
private static Tuple<Long, Set<Long>> pinnedTimestampsSet = new Tuple<>(-1L, Set.of());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,11 @@
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.snapshots.IndexShardSnapshotStatus;
import org.opensearch.index.snapshots.blobstore.RemoteStoreShardShallowCopySnapshot;
import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService;
import org.opensearch.snapshots.SnapshotId;
import org.opensearch.snapshots.SnapshotInfo;

Expand Down Expand Up @@ -220,11 +222,59 @@
/**
* Deletes snapshots and releases respective lock files from remote store repository.
*
* @param snapshotIds snapshot ids
* @param repositoryStateId the unique id identifying the state of the repository when the snapshot deletion began
* @param repositoryMetaVersion version of the updated repository metadata to write
* @param remoteStoreLockManagerFactory RemoteStoreLockManagerFactory to be used for cleaning up remote store lock files
* @param listener completion listener
* @param snapshotIds snapshot ids
* @param repositoryStateId the unique id identifying the state of the repository when the snapshot deletion began
* @param repositoryMetaVersion version of the updated repository metadata to write
* @param remoteStoreLockManagerFactory RemoteStoreLockManagerFactory to be used for cleaning up remote store lock files
* @param remoteSegmentStoreDirectoryFactory RemoteSegmentStoreDirectoryFactory to be used for cleaning up remote store segment files
* @param remoteStorePinnedTimestampService service for pinning and unpinning of the timestamp
* @param snapshotIdsPinnedTimestampMap map of snapshots ids and the pinned timestamp
* @param isShallowSnapshotV2 true for shallow snapshots v2
* @param listener completion listener
*/
default void deleteSnapshotsInternal(
Collection<SnapshotId> snapshotIds,
long repositoryStateId,
Version repositoryMetaVersion,
RemoteStoreLockManagerFactory remoteStoreLockManagerFactory,
RemoteSegmentStoreDirectoryFactory remoteSegmentStoreDirectoryFactory,
RemoteStorePinnedTimestampService remoteStorePinnedTimestampService,
Map<SnapshotId, Long> snapshotIdsPinnedTimestampMap,
boolean isShallowSnapshotV2,
ActionListener<RepositoryData> listener
) {
throw new UnsupportedOperationException();

Check warning on line 246 in server/src/main/java/org/opensearch/repositories/Repository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/Repository.java#L246

Added line #L246 was not covered by tests
}

/**
* Deletes snapshots and unpin the snapshot timestamp using remoteStorePinnedTimestampService
*
* @param snapshotsWithPinnedTimestamp map of snapshot ids and the pinned timestamps
* @param repositoryStateId the unique id identifying the state of the repository when the snapshot deletion began
* @param repositoryMetaVersion version of the updated repository metadata to write
* @param remoteSegmentStoreDirectoryFactory RemoteSegmentStoreDirectoryFactory to be used for cleaning up remote store segment files
* @param remoteStorePinnedTimestampService service for pinning and unpinning of the timestamp
* @param listener completion listener
*/
default void deleteSnapshotsWithPinnedTimestamp(
Map<SnapshotId, Long> snapshotsWithPinnedTimestamp,
long repositoryStateId,
Version repositoryMetaVersion,
RemoteSegmentStoreDirectoryFactory remoteSegmentStoreDirectoryFactory,
RemoteStorePinnedTimestampService remoteStorePinnedTimestampService,
ActionListener<RepositoryData> listener
) {
throw new UnsupportedOperationException();

Check warning on line 267 in server/src/main/java/org/opensearch/repositories/Repository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/Repository.java#L267

Added line #L267 was not covered by tests
}

/**
* Deletes snapshots and releases respective lock files from remote store repository
*
* @param snapshotIds
* @param repositoryStateId
* @param repositoryMetaVersion
* @param remoteStoreLockManagerFactory
* @param listener
*/
default void deleteSnapshotsAndReleaseLockFiles(
Collection<SnapshotId> snapshotIds,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ public final class RepositoryData {
* The indices found in the repository across all snapshots, as a name to {@link IndexId} mapping
*/
private final Map<String, IndexId> indices;

public Map<IndexId, List<SnapshotId>> getIndexSnapshots() {
return indexSnapshots;
}

/**
* The snapshots that each index belongs to.
*/
Expand Down
Loading
Loading