Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] [Snapshot V2] Move timestamp pinning before cluster state update #16325

Merged
merged 1 commit into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -938,17 +938,8 @@ public void testConcurrentSnapshotV2CreateOperation() throws InterruptedExceptio
Thread thread = new Thread(() -> {
try {
String snapshotName = "snapshot-concurrent-" + snapshotIndex;
CreateSnapshotResponse createSnapshotResponse2 = client().admin()
.cluster()
.prepareCreateSnapshot(snapshotRepoName, snapshotName)
.setWaitForCompletion(true)
.get();
SnapshotInfo snapshotInfo = createSnapshotResponse2.getSnapshotInfo();
assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfo.successfulShards(), greaterThan(0));
assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards()));
assertThat(snapshotInfo.snapshotId().getName(), equalTo(snapshotName));
assertThat(snapshotInfo.getPinnedTimestamp(), greaterThan(0L));
client().admin().cluster().prepareCreateSnapshot(snapshotRepoName, snapshotName).setWaitForCompletion(true).get();
logger.info("Snapshot completed {}", snapshotName);
} catch (Exception e) {}
});
threads.add(thread);
Expand All @@ -963,15 +954,19 @@ public void testConcurrentSnapshotV2CreateOperation() throws InterruptedExceptio
thread.join();
}

// Validate that only one snapshot has been created
// Sleeping 10 sec for earlier created snapshot to complete runNextQueuedOperation and be ready for next snapshot
// We can't put `waitFor` since we don't have visibility on its completion
Thread.sleep(TimeValue.timeValueSeconds(10).seconds());
client().admin().cluster().prepareCreateSnapshot(snapshotRepoName, "snapshot-cleanup-timestamp").setWaitForCompletion(true).get();
Repository repository = internalCluster().getInstance(RepositoriesService.class).repository(snapshotRepoName);
PlainActionFuture<RepositoryData> repositoryDataPlainActionFuture = new PlainActionFuture<>();
repository.getRepositoryData(repositoryDataPlainActionFuture);

RepositoryData repositoryData = repositoryDataPlainActionFuture.get();
assertThat(repositoryData.getSnapshotIds().size(), greaterThanOrEqualTo(1));
forceSyncPinnedTimestamps();
assertEquals(RemoteStorePinnedTimestampService.getPinnedEntities().size(), repositoryData.getSnapshotIds().size());
assertThat(repositoryData.getSnapshotIds().size(), greaterThanOrEqualTo(2));
waitUntil(() -> {
forceSyncPinnedTimestamps();
return RemoteStorePinnedTimestampService.getPinnedEntities().size() == repositoryData.getSnapshotIds().size();
});
}

public void testConcurrentSnapshotV2CreateOperation_MasterChange() throws Exception {
Expand Down
149 changes: 69 additions & 80 deletions server/src/main/java/org/opensearch/snapshots/SnapshotsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Nullable;
import org.opensearch.common.Priority;
import org.opensearch.common.SetOnce;
import org.opensearch.common.UUIDs;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
Expand Down Expand Up @@ -649,34 +650,35 @@
* @param listener snapshot creation listener
*/
public void createSnapshotV2(final CreateSnapshotRequest request, final ActionListener<SnapshotInfo> listener) {
long pinnedTimestamp = System.currentTimeMillis();
final String repositoryName = request.repository();
final String snapshotName = indexNameExpressionResolver.resolveDateMathExpression(request.snapshot());
validate(repositoryName, snapshotName);

final SnapshotId snapshotId = new SnapshotId(snapshotName, UUIDs.randomBase64UUID()); // new UUID for the snapshot
Snapshot snapshot = new Snapshot(repositoryName, snapshotId);
long pinnedTimestamp = System.currentTimeMillis();
try {
updateSnapshotPinnedTimestamp(snapshot, pinnedTimestamp);
} catch (Exception e) {
listener.onFailure(e);
return;

Check warning on line 664 in server/src/main/java/org/opensearch/snapshots/SnapshotsService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/snapshots/SnapshotsService.java#L662-L664

Added lines #L662 - L664 were not covered by tests
}

Repository repository = repositoriesService.repository(repositoryName);
validate(repositoryName, snapshotName);
repository.executeConsistentStateUpdate(repositoryData -> new ClusterStateUpdateTask(Priority.URGENT) {
private SnapshotsInProgress.Entry newEntry;

private SnapshotId snapshotId;

private Snapshot snapshot;

boolean enteredLoop;

@Override
public ClusterState execute(ClusterState currentState) {
// move to in progress
snapshotId = new SnapshotId(snapshotName, UUIDs.randomBase64UUID()); // new UUID for the snapshot
Repository repository = repositoriesService.repository(repositoryName);

if (repository.isReadOnly()) {
listener.onFailure(
new RepositoryException(repository.getMetadata().name(), "cannot create snapshot-v2 in a readonly repository")
);
}

snapshot = new Snapshot(repositoryName, snapshotId);
final Map<String, Object> userMeta = repository.adaptUserMetadata(request.userMetadata());

createSnapshotPreValidations(currentState, repositoryData, repositoryName, snapshotName);
Expand Down Expand Up @@ -776,59 +778,46 @@
pinnedTimestamp
);
final Version version = minCompatibleVersion(newState.nodes().getMinNodeVersion(), repositoryData, null);
final StepListener<RepositoryData> pinnedTimestampListener = new StepListener<>();
pinnedTimestampListener.whenComplete(repoData -> {
repository.finalizeSnapshot(
shardGenerations,
repositoryData.getGenId(),
metadataForSnapshot(newState.metadata(), request.includeGlobalState(), false, dataStreams, newEntry.indices()),
snapshotInfo,
version,
state -> stateWithoutSnapshot(state, snapshot),
Priority.IMMEDIATE,
new ActionListener<RepositoryData>() {
@Override
public void onResponse(RepositoryData repositoryData) {
if (clusterService.state().nodes().isLocalNodeElectedClusterManager() == false) {
leaveRepoLoop(repositoryName);
failSnapshotCompletionListeners(
snapshot,
new SnapshotException(snapshot, "Aborting snapshot-v2, no longer cluster manager")
);
listener.onFailure(
new SnapshotException(
repositoryName,
snapshotName,
"Aborting snapshot-v2, no longer cluster manager"
)
);
return;
}
listener.onResponse(snapshotInfo);
// For snapshot-v2, we don't allow concurrent snapshots . But meanwhile non-v2 snapshot operations
// can get queued . This is triggering them.
runNextQueuedOperation(repositoryData, repositoryName, true);
cleanOrphanTimestamp(repositoryName, repositoryData);
}

@Override
public void onFailure(Exception e) {
logger.error("Failed to finalize snapshot repo {} for snapshot-v2 {} ", repositoryName, snapshotName);
repository.finalizeSnapshot(
shardGenerations,
repositoryData.getGenId(),
metadataForSnapshot(newState.metadata(), request.includeGlobalState(), false, dataStreams, newEntry.indices()),
snapshotInfo,
version,
state -> stateWithoutSnapshot(state, snapshot),
Priority.IMMEDIATE,
new ActionListener<RepositoryData>() {
@Override
public void onResponse(RepositoryData repositoryData) {
if (clusterService.state().nodes().isLocalNodeElectedClusterManager() == false) {
leaveRepoLoop(repositoryName);
// cleaning up in progress snapshot here
stateWithoutSnapshotV2(newState);
listener.onFailure(e);
failSnapshotCompletionListeners(

Check warning on line 794 in server/src/main/java/org/opensearch/snapshots/SnapshotsService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/snapshots/SnapshotsService.java#L794

Added line #L794 was not covered by tests
snapshot,
new SnapshotException(snapshot, "Aborting snapshot-v2, no longer cluster manager")
);
listener.onFailure(

Check warning on line 798 in server/src/main/java/org/opensearch/snapshots/SnapshotsService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/snapshots/SnapshotsService.java#L798

Added line #L798 was not covered by tests
new SnapshotException(repositoryName, snapshotName, "Aborting snapshot-v2, no longer cluster manager")
);
return;

Check warning on line 801 in server/src/main/java/org/opensearch/snapshots/SnapshotsService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/snapshots/SnapshotsService.java#L801

Added line #L801 was not covered by tests
}
listener.onResponse(snapshotInfo);
logger.info("created snapshot-v2 [{}] in repository [{}]", repositoryName, snapshotName);
// For snapshot-v2, we don't allow concurrent snapshots . But meanwhile non-v2 snapshot operations
// can get queued . This is triggering them.
runNextQueuedOperation(repositoryData, repositoryName, true);
cleanOrphanTimestamp(repositoryName, repositoryData);
}
);
}, e -> {
logger.error("Failed to update pinned timestamp for snapshot-v2 {} {} {} ", repositoryName, snapshotName, e);
leaveRepoLoop(repositoryName);
// cleaning up in progress snapshot here
stateWithoutSnapshotV2(newState);
listener.onFailure(e);
});
updateSnapshotPinnedTimestamp(repositoryData, snapshot, pinnedTimestamp, pinnedTimestampListener);

@Override
public void onFailure(Exception e) {
logger.error("Failed to finalize snapshot repo {} for snapshot-v2 {} ", repositoryName, snapshotName);
leaveRepoLoop(repositoryName);

Check warning on line 814 in server/src/main/java/org/opensearch/snapshots/SnapshotsService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/snapshots/SnapshotsService.java#L813-L814

Added lines #L813 - L814 were not covered by tests
// cleaning up in progress snapshot here
stateWithoutSnapshotV2(newState);
listener.onFailure(e);
}

Check warning on line 818 in server/src/main/java/org/opensearch/snapshots/SnapshotsService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/snapshots/SnapshotsService.java#L816-L818

Added lines #L816 - L818 were not covered by tests
}
);
}

@Override
Expand Down Expand Up @@ -916,30 +905,30 @@
ensureNoCleanupInProgress(currentState, repositoryName, snapshotName);
}

private void updateSnapshotPinnedTimestamp(
RepositoryData repositoryData,
Snapshot snapshot,
long timestampToPin,
ActionListener<RepositoryData> listener
) {
private void updateSnapshotPinnedTimestamp(Snapshot snapshot, long timestampToPin) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
SetOnce<Exception> ex = new SetOnce<>();
ActionListener<Void> listener = new ActionListener<>() {
@Override
public void onResponse(Void unused) {
logger.debug("Timestamp pinned successfully for snapshot {}", snapshot.getSnapshotId().getName());
}

@Override
public void onFailure(Exception e) {
logger.error("Failed to pin timestamp for snapshot {} with exception {}", snapshot.getSnapshotId().getName(), e);
ex.set(e);
}

Check warning on line 921 in server/src/main/java/org/opensearch/snapshots/SnapshotsService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/snapshots/SnapshotsService.java#L919-L921

Added lines #L919 - L921 were not covered by tests
};
remoteStorePinnedTimestampService.pinTimestamp(
timestampToPin,
getPinningEntity(snapshot.getRepository(), snapshot.getSnapshotId().getUUID()),
new ActionListener<Void>() {
@Override
public void onResponse(Void unused) {
logger.debug("Timestamp pinned successfully for snapshot {}", snapshot.getSnapshotId().getName());
listener.onResponse(repositoryData);
}

@Override
public void onFailure(Exception e) {
logger.error("Failed to pin timestamp for snapshot {} with exception {}", snapshot.getSnapshotId().getName(), e);
listener.onFailure(e);

}
}
new LatchedActionListener<>(listener, latch)
);
latch.await();
if (ex.get() != null) {
throw ex.get();

Check warning on line 930 in server/src/main/java/org/opensearch/snapshots/SnapshotsService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/snapshots/SnapshotsService.java#L930

Added line #L930 was not covered by tests
}
}

public static String getPinningEntity(String repositoryName, String snapshotUUID) {
Expand Down
Loading