Skip to content

Commit

Permalink
Track shard snapshot progress during node shutdown (elastic#112567)
Browse files Browse the repository at this point in the history
Track shard snapshot progress during shutdown to identify any
bottlenecks that cause slowness that can ultimately block shard
re-allocation.

Relates ES-9086
  • Loading branch information
DiannaHohensee authored Oct 8, 2024
1 parent 2ba9bc9 commit 84fe9cf
Show file tree
Hide file tree
Showing 10 changed files with 1,087 additions and 52 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/112567.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 112567
summary: Track shard snapshot progress during node shutdown
area: Snapshot/Restore
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -253,35 +253,17 @@ private Set<ShardId> getShardIds(final String nodeId, final String indexName) {
}

/**
* Index documents until all the shards are at least WATERMARK_BYTES in size, and return the one with the smallest size
* Index documents until all the shards are at least WATERMARK_BYTES in size.
* @return the shard sizes.
*/
private ShardSizes createReasonableSizedShards(final String indexName) {
while (true) {
indexRandom(false, indexName, scaledRandomIntBetween(100, 10000));
forceMerge();
refresh();

final ShardStats[] shardStates = indicesAdmin().prepareStats(indexName)
.clear()
.setStore(true)
.setTranslog(true)
.get()
.getShards();

var smallestShardSize = Arrays.stream(shardStates)
.mapToLong(it -> it.getStats().getStore().sizeInBytes())
.min()
.orElseThrow(() -> new AssertionError("no shards"));

if (smallestShardSize > WATERMARK_BYTES) {
var shardSizes = Arrays.stream(shardStates)
.map(it -> new ShardSize(removeIndexUUID(it.getShardRouting().shardId()), it.getStats().getStore().sizeInBytes()))
.sorted(Comparator.comparing(ShardSize::size))
.toList();
logger.info("Created shards with sizes {}", shardSizes);
return new ShardSizes(shardSizes);
}
}
ShardStats[] shardStats = indexAllShardsToAnEqualOrGreaterMinimumSize(indexName, WATERMARK_BYTES);
var shardSizes = Arrays.stream(shardStats)
.map(it -> new ShardSize(removeIndexUUID(it.getShardRouting().shardId()), it.getStats().getStore().sizeInBytes()))
.sorted(Comparator.comparing(ShardSize::size))
.toList();
logger.info("Created shards with sizes {}", shardSizes);
return new ShardSizes(shardSizes);
}

private record ShardSizes(List<ShardSize> sizes) {
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,11 @@ public List<DiscoveryNode> addedNodes() {
return added;
}

@Override
public String toString() {
return shortSummary();
}

public String shortSummary() {
final StringBuilder summary = new StringBuilder();
if (masterNodeChanged()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@
import org.elasticsearch.search.fetch.subphase.highlight.FastVectorHighlighter;
import org.elasticsearch.snapshots.InternalSnapshotsInfoService;
import org.elasticsearch.snapshots.RestoreService;
import org.elasticsearch.snapshots.SnapshotShutdownProgressTracker;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ProxyConnectionStrategy;
Expand Down Expand Up @@ -368,6 +369,7 @@ public void apply(Settings value, Settings current, Settings previous) {
SniffConnectionStrategy.REMOTE_NODE_CONNECTIONS,
TransportCloseIndexAction.CLUSTER_INDICES_CLOSE_ENABLE_SETTING,
ShardsLimitAllocationDecider.CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING,
SnapshotShutdownProgressTracker.SNAPSHOT_PROGRESS_DURING_SHUTDOWN_LOG_INTERVAL_SETTING,
NodeConnectionsService.CLUSTER_NODE_RECONNECT_INTERVAL_SETTING,
HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_TYPE_SETTING,
HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_TYPE_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,10 @@ public synchronized void moveToDone(final long endTime, final ShardSnapshotResul
}
}

public Stage getStage() {
return stage.get();
}

public void addAbortListener(ActionListener<AbortStatus> listener) {
abortListeners.addListener(listener);
}
Expand Down Expand Up @@ -429,4 +433,31 @@ public String toString() {
+ ')';
}
}

@Override
public String toString() {
return "index shard snapshot status ("
+ "stage="
+ stage
+ ", startTime="
+ startTime
+ ", totalTime="
+ totalTime
+ ", incrementalFileCount="
+ incrementalFileCount
+ ", totalFileCount="
+ totalFileCount
+ ", processedFileCount="
+ processedFileCount
+ ", incrementalSize="
+ incrementalSize
+ ", totalSize="
+ totalSize
+ ", processedSize="
+ processedSize
+ ", failure='"
+ failure
+ '\''
+ ')';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus;
import org.elasticsearch.cluster.SnapshotsInProgress.ShardState;
import org.elasticsearch.cluster.metadata.NodesShutdownMetadata;
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
Expand Down Expand Up @@ -82,6 +84,8 @@ public final class SnapshotShardsService extends AbstractLifecycleComponent impl

private final ThreadPool threadPool;

private final SnapshotShutdownProgressTracker snapshotShutdownProgressTracker;

private final Map<Snapshot, Map<ShardId, IndexShardSnapshotStatus>> shardSnapshots = new HashMap<>();

// A map of snapshots to the shardIds that we already reported to the master as failed
Expand All @@ -102,6 +106,11 @@ public SnapshotShardsService(
this.transportService = transportService;
this.clusterService = clusterService;
this.threadPool = transportService.getThreadPool();
this.snapshotShutdownProgressTracker = new SnapshotShutdownProgressTracker(
() -> clusterService.state().nodes().getLocalNodeId(),
clusterService.getClusterSettings(),
threadPool
);
this.remoteFailedRequestDeduplicator = new ResultDeduplicator<>(threadPool.getThreadContext());
if (DiscoveryNode.canContainData(settings)) {
// this is only useful on the nodes that can hold data
Expand Down Expand Up @@ -130,11 +139,38 @@ protected void doClose() {
@Override
public void clusterChanged(ClusterChangedEvent event) {
try {
final var localNodeId = clusterService.localNode().getId();

// Track when this node enters and leaves shutdown mode because we pause shard snapshots for shutdown.
// The snapshotShutdownProgressTracker will report (via logging) on the progress shard snapshots make
// towards either completing (successfully or otherwise) or pausing.
NodesShutdownMetadata currentShutdownMetadata = event.state().metadata().custom(NodesShutdownMetadata.TYPE);
NodesShutdownMetadata previousShutdownMetadata = event.previousState().metadata().custom(NodesShutdownMetadata.TYPE);
SingleNodeShutdownMetadata currentLocalNodeShutdownMetadata = currentShutdownMetadata != null
? currentShutdownMetadata.get(localNodeId)
: null;
SingleNodeShutdownMetadata previousLocalNodeShutdownMetadata = previousShutdownMetadata != null
? previousShutdownMetadata.get(localNodeId)
: null;

boolean isLocalNodeAddingShutdown = false;
if (isPausingProgressTrackedShutdown(previousLocalNodeShutdownMetadata) == false
&& isPausingProgressTrackedShutdown(currentLocalNodeShutdownMetadata)) {
snapshotShutdownProgressTracker.onClusterStateAddShutdown();
isLocalNodeAddingShutdown = true;
} else if (isPausingProgressTrackedShutdown(previousLocalNodeShutdownMetadata)
&& isPausingProgressTrackedShutdown(currentLocalNodeShutdownMetadata) == false) {
snapshotShutdownProgressTracker.onClusterStateRemoveShutdown();
}

final var currentSnapshots = SnapshotsInProgress.get(event.state());

if (SnapshotsInProgress.get(event.previousState()).equals(currentSnapshots) == false) {
final var localNodeId = clusterService.localNode().getId();
synchronized (shardSnapshots) {
// Cancel any snapshots that have been removed from the cluster state.
cancelRemoved(currentSnapshots);

// Update running snapshots or start any snapshots that are set to run.
for (final var oneRepoSnapshotsInProgress : currentSnapshots.entriesByRepo()) {
for (final var snapshotsInProgressEntry : oneRepoSnapshotsInProgress) {
handleUpdatedSnapshotsInProgressEntry(
Expand All @@ -147,6 +183,11 @@ public void clusterChanged(ClusterChangedEvent event) {
}
}

if (isLocalNodeAddingShutdown) {
// Any active snapshots would have been signalled to pause in the previous code block.
snapshotShutdownProgressTracker.onClusterStatePausingSetForAllShardSnapshots();
}

String previousMasterNodeId = event.previousState().nodes().getMasterNodeId();
String currentMasterNodeId = event.state().nodes().getMasterNodeId();
if (currentMasterNodeId != null && currentMasterNodeId.equals(previousMasterNodeId) == false) {
Expand All @@ -164,6 +205,17 @@ public void clusterChanged(ClusterChangedEvent event) {
}
}

/**
* Determines whether we want to track this kind of shutdown for snapshot pausing progress.
* We want tracking is shutdown metadata is set, and not type RESTART.
* Note that the Shutdown API is idempotent and the type of shutdown may change to / from RESTART to / from some other type of interest.
*
* @return true if snapshots will be paused during this type of local node shutdown.
*/
private static boolean isPausingProgressTrackedShutdown(@Nullable SingleNodeShutdownMetadata localNodeShutdownMetadata) {
return localNodeShutdownMetadata != null && localNodeShutdownMetadata.getType() != SingleNodeShutdownMetadata.Type.RESTART;
}

@Override
public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) {
// abort any snapshots occurring on the soon-to-be closed shard
Expand Down Expand Up @@ -231,6 +283,9 @@ private void cancelRemoved(SnapshotsInProgress snapshotsInProgress) {
}
}

/**
* Starts new snapshots and pauses or aborts active shard snapshot based on the updated {@link SnapshotsInProgress} entry.
*/
private void handleUpdatedSnapshotsInProgressEntry(String localNodeId, boolean removingLocalNode, SnapshotsInProgress.Entry entry) {
if (entry.isClone()) {
// This is a snapshot clone, it will be executed on the current master
Expand Down Expand Up @@ -364,8 +419,7 @@ private Runnable newShardSnapshotTask(
final IndexVersion entryVersion,
final long entryStartTime
) {
// separate method to make sure this lambda doesn't capture any heavy local objects like a SnapshotsInProgress.Entry
return () -> snapshot(shardId, snapshot, indexId, snapshotStatus, entryVersion, entryStartTime, new ActionListener<>() {
ActionListener<ShardSnapshotResult> snapshotResultListener = new ActionListener<>() {
@Override
public void onResponse(ShardSnapshotResult shardSnapshotResult) {
final ShardGeneration newGeneration = shardSnapshotResult.getGeneration();
Expand Down Expand Up @@ -405,7 +459,15 @@ public void onFailure(Exception e) {
final var shardState = snapshotStatus.moveToUnsuccessful(nextStage, failure, threadPool.absoluteTimeInMillis());
notifyUnsuccessfulSnapshotShard(snapshot, shardId, shardState, failure, snapshotStatus.generation());
}
};

snapshotShutdownProgressTracker.incNumberOfShardSnapshotsInProgress(shardId, snapshot);
var decTrackerRunsBeforeResultListener = ActionListener.runAfter(snapshotResultListener, () -> {
snapshotShutdownProgressTracker.decNumberOfShardSnapshotsInProgress(shardId, snapshot, snapshotStatus);
});

// separate method to make sure this lambda doesn't capture any heavy local objects like a SnapshotsInProgress.Entry
return () -> snapshot(shardId, snapshot, indexId, snapshotStatus, entryVersion, entryStartTime, decTrackerRunsBeforeResultListener);
}

// package private for testing
Expand Down Expand Up @@ -665,19 +727,25 @@ private void notifyUnsuccessfulSnapshotShard(

/** Updates the shard snapshot status by sending a {@link UpdateIndexShardSnapshotStatusRequest} to the master node */
private void sendSnapshotShardUpdate(final Snapshot snapshot, final ShardId shardId, final ShardSnapshotStatus status) {
ActionListener<Void> updateResultListener = new ActionListener<>() {
@Override
public void onResponse(Void aVoid) {
logger.trace("[{}][{}] updated snapshot state to [{}]", shardId, snapshot, status);
}

@Override
public void onFailure(Exception e) {
logger.warn(() -> format("[%s][%s] failed to update snapshot state to [%s]", shardId, snapshot, status), e);
}
};
snapshotShutdownProgressTracker.trackRequestSentToMaster(snapshot, shardId);
var releaseTrackerRequestRunsBeforeResultListener = ActionListener.runBefore(updateResultListener, () -> {
snapshotShutdownProgressTracker.releaseRequestSentToMaster(snapshot, shardId);
});

remoteFailedRequestDeduplicator.executeOnce(
new UpdateIndexShardSnapshotStatusRequest(snapshot, shardId, status),
new ActionListener<>() {
@Override
public void onResponse(Void aVoid) {
logger.trace("[{}][{}] updated snapshot state to [{}]", shardId, snapshot, status);
}

@Override
public void onFailure(Exception e) {
logger.warn(() -> format("[%s][%s] failed to update snapshot state to [%s]", shardId, snapshot, status), e);
}
},
releaseTrackerRequestRunsBeforeResultListener,
(req, reqListener) -> transportService.sendRequest(
transportService.getLocalNode(),
SnapshotsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME,
Expand Down
Loading

0 comments on commit 84fe9cf

Please sign in to comment.