Skip to content

Commit

Permalink
[Snapshot V2] Move timestamp pinning before cluster state update (#16269
Browse files Browse the repository at this point in the history
)

* Move timestamp pinning before cluster state update

Signed-off-by: Gaurav Bafna <[email protected]>

* Address PR Comments

Signed-off-by: Gaurav Bafna <[email protected]>

* Fix IT

Signed-off-by: Gaurav Bafna <[email protected]>

---------

Signed-off-by: Gaurav Bafna <[email protected]>
(cherry picked from commit b5dcde3)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
github-actions[bot] committed Oct 15, 2024
1 parent 4ecd159 commit e32454c
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -938,17 +938,8 @@ public void testConcurrentSnapshotV2CreateOperation() throws InterruptedExceptio
Thread thread = new Thread(() -> {
try {
String snapshotName = "snapshot-concurrent-" + snapshotIndex;
CreateSnapshotResponse createSnapshotResponse2 = client().admin()
.cluster()
.prepareCreateSnapshot(snapshotRepoName, snapshotName)
.setWaitForCompletion(true)
.get();
SnapshotInfo snapshotInfo = createSnapshotResponse2.getSnapshotInfo();
assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfo.successfulShards(), greaterThan(0));
assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards()));
assertThat(snapshotInfo.snapshotId().getName(), equalTo(snapshotName));
assertThat(snapshotInfo.getPinnedTimestamp(), greaterThan(0L));
client().admin().cluster().prepareCreateSnapshot(snapshotRepoName, snapshotName).setWaitForCompletion(true).get();
logger.info("Snapshot completed {}", snapshotName);
} catch (Exception e) {}
});
threads.add(thread);
Expand All @@ -963,15 +954,19 @@ public void testConcurrentSnapshotV2CreateOperation() throws InterruptedExceptio
thread.join();
}

// Validate that only one snapshot has been created
// Sleeping 10 sec for earlier created snapshot to complete runNextQueuedOperation and be ready for next snapshot
// We can't put `waitFor` since we don't have visibility on its completion
Thread.sleep(TimeValue.timeValueSeconds(10).seconds());
client().admin().cluster().prepareCreateSnapshot(snapshotRepoName, "snapshot-cleanup-timestamp").setWaitForCompletion(true).get();
Repository repository = internalCluster().getInstance(RepositoriesService.class).repository(snapshotRepoName);
PlainActionFuture<RepositoryData> repositoryDataPlainActionFuture = new PlainActionFuture<>();
repository.getRepositoryData(repositoryDataPlainActionFuture);

RepositoryData repositoryData = repositoryDataPlainActionFuture.get();
assertThat(repositoryData.getSnapshotIds().size(), greaterThanOrEqualTo(1));
forceSyncPinnedTimestamps();
assertEquals(RemoteStorePinnedTimestampService.getPinnedEntities().size(), repositoryData.getSnapshotIds().size());
assertThat(repositoryData.getSnapshotIds().size(), greaterThanOrEqualTo(2));
waitUntil(() -> {
forceSyncPinnedTimestamps();
return RemoteStorePinnedTimestampService.getPinnedEntities().size() == repositoryData.getSnapshotIds().size();
});
}

public void testConcurrentSnapshotV2CreateOperation_MasterChange() throws Exception {
Expand Down
149 changes: 69 additions & 80 deletions server/src/main/java/org/opensearch/snapshots/SnapshotsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Nullable;
import org.opensearch.common.Priority;
import org.opensearch.common.SetOnce;
import org.opensearch.common.UUIDs;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
Expand Down Expand Up @@ -649,34 +650,35 @@ public TimeValue timeout() {
* @param listener snapshot creation listener
*/
public void createSnapshotV2(final CreateSnapshotRequest request, final ActionListener<SnapshotInfo> listener) {
long pinnedTimestamp = System.currentTimeMillis();
final String repositoryName = request.repository();
final String snapshotName = indexNameExpressionResolver.resolveDateMathExpression(request.snapshot());
validate(repositoryName, snapshotName);

final SnapshotId snapshotId = new SnapshotId(snapshotName, UUIDs.randomBase64UUID()); // new UUID for the snapshot
Snapshot snapshot = new Snapshot(repositoryName, snapshotId);
long pinnedTimestamp = System.currentTimeMillis();
try {
updateSnapshotPinnedTimestamp(snapshot, pinnedTimestamp);
} catch (Exception e) {
listener.onFailure(e);
return;

Check warning on line 664 in server/src/main/java/org/opensearch/snapshots/SnapshotsService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/snapshots/SnapshotsService.java#L662-L664

Added lines #L662 - L664 were not covered by tests
}

Repository repository = repositoriesService.repository(repositoryName);
validate(repositoryName, snapshotName);
repository.executeConsistentStateUpdate(repositoryData -> new ClusterStateUpdateTask(Priority.URGENT) {
private SnapshotsInProgress.Entry newEntry;

private SnapshotId snapshotId;

private Snapshot snapshot;

boolean enteredLoop;

@Override
public ClusterState execute(ClusterState currentState) {
// move to in progress
snapshotId = new SnapshotId(snapshotName, UUIDs.randomBase64UUID()); // new UUID for the snapshot
Repository repository = repositoriesService.repository(repositoryName);

if (repository.isReadOnly()) {
listener.onFailure(
new RepositoryException(repository.getMetadata().name(), "cannot create snapshot-v2 in a readonly repository")
);
}

snapshot = new Snapshot(repositoryName, snapshotId);
final Map<String, Object> userMeta = repository.adaptUserMetadata(request.userMetadata());

createSnapshotPreValidations(currentState, repositoryData, repositoryName, snapshotName);
Expand Down Expand Up @@ -776,59 +778,46 @@ public void clusterStateProcessed(String source, ClusterState oldState, final Cl
pinnedTimestamp
);
final Version version = minCompatibleVersion(newState.nodes().getMinNodeVersion(), repositoryData, null);
final StepListener<RepositoryData> pinnedTimestampListener = new StepListener<>();
pinnedTimestampListener.whenComplete(repoData -> {
repository.finalizeSnapshot(
shardGenerations,
repositoryData.getGenId(),
metadataForSnapshot(newState.metadata(), request.includeGlobalState(), false, dataStreams, newEntry.indices()),
snapshotInfo,
version,
state -> stateWithoutSnapshot(state, snapshot),
Priority.IMMEDIATE,
new ActionListener<RepositoryData>() {
@Override
public void onResponse(RepositoryData repositoryData) {
if (clusterService.state().nodes().isLocalNodeElectedClusterManager() == false) {
leaveRepoLoop(repositoryName);
failSnapshotCompletionListeners(
snapshot,
new SnapshotException(snapshot, "Aborting snapshot-v2, no longer cluster manager")
);
listener.onFailure(
new SnapshotException(
repositoryName,
snapshotName,
"Aborting snapshot-v2, no longer cluster manager"
)
);
return;
}
listener.onResponse(snapshotInfo);
// For snapshot-v2, we don't allow concurrent snapshots . But meanwhile non-v2 snapshot operations
// can get queued . This is triggering them.
runNextQueuedOperation(repositoryData, repositoryName, true);
cleanOrphanTimestamp(repositoryName, repositoryData);
}

@Override
public void onFailure(Exception e) {
logger.error("Failed to finalize snapshot repo {} for snapshot-v2 {} ", repositoryName, snapshotName);
repository.finalizeSnapshot(
shardGenerations,
repositoryData.getGenId(),
metadataForSnapshot(newState.metadata(), request.includeGlobalState(), false, dataStreams, newEntry.indices()),
snapshotInfo,
version,
state -> stateWithoutSnapshot(state, snapshot),
Priority.IMMEDIATE,
new ActionListener<RepositoryData>() {
@Override
public void onResponse(RepositoryData repositoryData) {
if (clusterService.state().nodes().isLocalNodeElectedClusterManager() == false) {
leaveRepoLoop(repositoryName);
// cleaning up in progress snapshot here
stateWithoutSnapshotV2(newState);
listener.onFailure(e);
failSnapshotCompletionListeners(

Check warning on line 794 in server/src/main/java/org/opensearch/snapshots/SnapshotsService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/snapshots/SnapshotsService.java#L794

Added line #L794 was not covered by tests
snapshot,
new SnapshotException(snapshot, "Aborting snapshot-v2, no longer cluster manager")
);
listener.onFailure(

Check warning on line 798 in server/src/main/java/org/opensearch/snapshots/SnapshotsService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/snapshots/SnapshotsService.java#L798

Added line #L798 was not covered by tests
new SnapshotException(repositoryName, snapshotName, "Aborting snapshot-v2, no longer cluster manager")
);
return;

Check warning on line 801 in server/src/main/java/org/opensearch/snapshots/SnapshotsService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/snapshots/SnapshotsService.java#L801

Added line #L801 was not covered by tests
}
listener.onResponse(snapshotInfo);
logger.info("created snapshot-v2 [{}] in repository [{}]", repositoryName, snapshotName);
// For snapshot-v2, we don't allow concurrent snapshots . But meanwhile non-v2 snapshot operations
// can get queued . This is triggering them.
runNextQueuedOperation(repositoryData, repositoryName, true);
cleanOrphanTimestamp(repositoryName, repositoryData);
}
);
}, e -> {
logger.error("Failed to update pinned timestamp for snapshot-v2 {} {} {} ", repositoryName, snapshotName, e);
leaveRepoLoop(repositoryName);
// cleaning up in progress snapshot here
stateWithoutSnapshotV2(newState);
listener.onFailure(e);
});
updateSnapshotPinnedTimestamp(repositoryData, snapshot, pinnedTimestamp, pinnedTimestampListener);

@Override
public void onFailure(Exception e) {
logger.error("Failed to finalize snapshot repo {} for snapshot-v2 {} ", repositoryName, snapshotName);
leaveRepoLoop(repositoryName);

Check warning on line 814 in server/src/main/java/org/opensearch/snapshots/SnapshotsService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/snapshots/SnapshotsService.java#L813-L814

Added lines #L813 - L814 were not covered by tests
// cleaning up in progress snapshot here
stateWithoutSnapshotV2(newState);
listener.onFailure(e);
}

Check warning on line 818 in server/src/main/java/org/opensearch/snapshots/SnapshotsService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/snapshots/SnapshotsService.java#L816-L818

Added lines #L816 - L818 were not covered by tests
}
);
}

@Override
Expand Down Expand Up @@ -916,30 +905,30 @@ private void createSnapshotPreValidations(
ensureNoCleanupInProgress(currentState, repositoryName, snapshotName);
}

private void updateSnapshotPinnedTimestamp(
RepositoryData repositoryData,
Snapshot snapshot,
long timestampToPin,
ActionListener<RepositoryData> listener
) {
private void updateSnapshotPinnedTimestamp(Snapshot snapshot, long timestampToPin) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
SetOnce<Exception> ex = new SetOnce<>();
ActionListener<Void> listener = new ActionListener<>() {
@Override
public void onResponse(Void unused) {
logger.debug("Timestamp pinned successfully for snapshot {}", snapshot.getSnapshotId().getName());
}

@Override
public void onFailure(Exception e) {
logger.error("Failed to pin timestamp for snapshot {} with exception {}", snapshot.getSnapshotId().getName(), e);
ex.set(e);
}

Check warning on line 921 in server/src/main/java/org/opensearch/snapshots/SnapshotsService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/snapshots/SnapshotsService.java#L919-L921

Added lines #L919 - L921 were not covered by tests
};
remoteStorePinnedTimestampService.pinTimestamp(
timestampToPin,
getPinningEntity(snapshot.getRepository(), snapshot.getSnapshotId().getUUID()),
new ActionListener<Void>() {
@Override
public void onResponse(Void unused) {
logger.debug("Timestamp pinned successfully for snapshot {}", snapshot.getSnapshotId().getName());
listener.onResponse(repositoryData);
}

@Override
public void onFailure(Exception e) {
logger.error("Failed to pin timestamp for snapshot {} with exception {}", snapshot.getSnapshotId().getName(), e);
listener.onFailure(e);

}
}
new LatchedActionListener<>(listener, latch)
);
latch.await();
if (ex.get() != null) {
throw ex.get();

Check warning on line 930 in server/src/main/java/org/opensearch/snapshots/SnapshotsService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/snapshots/SnapshotsService.java#L930

Added line #L930 was not covered by tests
}
}

public static String getPinningEntity(String repositoryName, String snapshotUUID) {
Expand Down

0 comments on commit e32454c

Please sign in to comment.