From 84fe9cf3a303c8c9477abe16c1783cd319e2c89f Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Tue, 8 Oct 2024 22:59:37 +0200 Subject: [PATCH] Track shard snapshot progress during node shutdown (#112567) Track shard snapshot progress during shutdown to identify any bottlenecks that cause slowness that can ultimately block shard re-allocation. Relates ES-9086 --- docs/changelog/112567.yaml | 5 + .../decider/DiskThresholdDeciderIT.java | 36 +- .../snapshots/SnapshotShutdownIT.java | 255 ++++++++++- .../cluster/node/DiscoveryNodes.java | 5 + .../common/settings/ClusterSettings.java | 2 + .../snapshots/IndexShardSnapshotStatus.java | 31 ++ .../snapshots/SnapshotShardsService.java | 96 ++++- .../SnapshotShutdownProgressTracker.java | 270 ++++++++++++ .../SnapshotShutdownProgressTrackerTests.java | 407 ++++++++++++++++++ .../elasticsearch/test/ESIntegTestCase.java | 32 ++ 10 files changed, 1087 insertions(+), 52 deletions(-) create mode 100644 docs/changelog/112567.yaml create mode 100644 server/src/main/java/org/elasticsearch/snapshots/SnapshotShutdownProgressTracker.java create mode 100644 server/src/test/java/org/elasticsearch/snapshots/SnapshotShutdownProgressTrackerTests.java diff --git a/docs/changelog/112567.yaml b/docs/changelog/112567.yaml new file mode 100644 index 0000000000000..25e3ac8360c2b --- /dev/null +++ b/docs/changelog/112567.yaml @@ -0,0 +1,5 @@ +pr: 112567 +summary: Track shard snapshot progress during node shutdown +area: Snapshot/Restore +type: enhancement +issues: [] diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java index 2a275cf563d86..19b0f0bd73233 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java @@ -253,35 +253,17 @@ private Set getShardIds(final String nodeId, final String indexName) { } /** - * Index documents until all the shards are at least WATERMARK_BYTES in size, and return the one with the smallest size + * Index documents until all the shards are at least WATERMARK_BYTES in size. + * @return the shard sizes. */ private ShardSizes createReasonableSizedShards(final String indexName) { - while (true) { - indexRandom(false, indexName, scaledRandomIntBetween(100, 10000)); - forceMerge(); - refresh(); - - final ShardStats[] shardStates = indicesAdmin().prepareStats(indexName) - .clear() - .setStore(true) - .setTranslog(true) - .get() - .getShards(); - - var smallestShardSize = Arrays.stream(shardStates) - .mapToLong(it -> it.getStats().getStore().sizeInBytes()) - .min() - .orElseThrow(() -> new AssertionError("no shards")); - - if (smallestShardSize > WATERMARK_BYTES) { - var shardSizes = Arrays.stream(shardStates) - .map(it -> new ShardSize(removeIndexUUID(it.getShardRouting().shardId()), it.getStats().getStore().sizeInBytes())) - .sorted(Comparator.comparing(ShardSize::size)) - .toList(); - logger.info("Created shards with sizes {}", shardSizes); - return new ShardSizes(shardSizes); - } - } + ShardStats[] shardStats = indexAllShardsToAnEqualOrGreaterMinimumSize(indexName, WATERMARK_BYTES); + var shardSizes = Arrays.stream(shardStats) + .map(it -> new ShardSize(removeIndexUUID(it.getShardRouting().shardId()), it.getStats().getStore().sizeInBytes())) + .sorted(Comparator.comparing(ShardSize::size)) + .toList(); + logger.info("Created shards with sizes {}", shardSizes); + return new ShardSizes(shardSizes); } private record ShardSizes(List sizes) { diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotShutdownIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotShutdownIT.java index 3c71b50321c76..980ef2a87c9c2 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotShutdownIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotShutdownIT.java @@ -9,6 +9,7 @@ package org.elasticsearch.snapshots; +import org.apache.logging.log4j.Level; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionResponse; @@ -33,19 +34,26 @@ import org.elasticsearch.common.Priority; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ClusterServiceUtils; +import org.elasticsearch.test.MockLog; +import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.transport.MockTransportService; import java.util.Collection; import java.util.Map; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; +import static org.elasticsearch.snapshots.SnapshotShutdownProgressTracker.SNAPSHOT_PROGRESS_DURING_SHUTDOWN_LOG_INTERVAL_SETTING; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.hasSize; @@ -55,15 +63,44 @@ public class SnapshotShutdownIT extends AbstractSnapshotIntegTestCase { private static final String REQUIRE_NODE_NAME_SETTING = IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_PREFIX + "._name"; + private MockLog mockLog; + + public void setUp() throws Exception { + super.setUp(); + mockLog = MockLog.capture(SnapshotShutdownProgressTracker.class); + } + + private void resetMockLog() { + mockLog.close(); + mockLog = MockLog.capture(SnapshotShutdownProgressTracker.class); + } + + public void tearDown() throws Exception { + mockLog.close(); + super.tearDown(); + } + @Override protected Collection> nodePlugins() { return CollectionUtils.appendToCopy(super.nodePlugins(), MockTransportService.TestPlugin.class); } + /** + * Tests that shard snapshots on a node with RESTART shutdown metadata will finish on the same node. + */ + @TestLogging( + value = "org.elasticsearch.snapshots.SnapshotShutdownProgressTracker:DEBUG", + reason = "Testing SnapshotShutdownProgressTracker's progress, which is reported at the DEBUG logging level" + ) public void testRestartNodeDuringSnapshot() throws Exception { // Marking a node for restart has no impact on snapshots (see #71333 for how to handle this case) internalCluster().ensureAtLeastNumDataNodes(1); - final var originalNode = internalCluster().startDataOnlyNode(); + final var originalNode = internalCluster().startDataOnlyNode( + // Speed up the logging frequency, so that the test doesn't have to wait too long to check for log messages. + Settings.builder().put(SNAPSHOT_PROGRESS_DURING_SHUTDOWN_LOG_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(200)).build() + ); + final String originalNodeId = internalCluster().getInstance(NodeEnvironment.class, originalNode).nodeId(); + final var indexName = randomIdentifier(); createIndexWithContent(indexName, indexSettings(1, 0).put(REQUIRE_NODE_NAME_SETTING, originalNode).build()); @@ -88,6 +125,16 @@ public void testRestartNodeDuringSnapshot() throws Exception { }); addUnassignedShardsWatcher(clusterService, indexName); + // Ensure that the SnapshotShutdownProgressTracker does not start logging in RESTART mode. + mockLog.addExpectation( + new MockLog.UnseenEventExpectation( + "SnapshotShutdownProgressTracker start log message", + SnapshotShutdownProgressTracker.class.getCanonicalName(), + Level.DEBUG, + "Starting shutdown snapshot progress logging on node [" + originalNodeId + "]" + ) + ); + safeAwait( (ActionListener listener) -> putShutdownMetadata( clusterService, @@ -100,9 +147,15 @@ public void testRestartNodeDuringSnapshot() throws Exception { ) ); assertFalse(snapshotCompletesWithoutPausingListener.isDone()); + + // Verify no SnapshotShutdownProgressTracker logging in RESTART mode. + mockLog.awaitAllExpectationsMatched(); + resetMockLog(); + unblockAllDataNodes(repoName); // lets the shard snapshot continue so the snapshot can succeed assertEquals(SnapshotState.SUCCESS, snapshotFuture.get(10, TimeUnit.SECONDS).getSnapshotInfo().state()); safeAwait(snapshotCompletesWithoutPausingListener); + clearShutdownMetadata(clusterService); } @@ -117,7 +170,7 @@ public void testRemoveNodeDuringSnapshot() throws Exception { final var clusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class); final var snapshotFuture = startFullSnapshotBlockedOnDataNode(randomIdentifier(), repoName, originalNode); - final var snapshotPausedListener = createSnapshotPausedListener(clusterService, repoName, indexName); + final var snapshotPausedListener = createSnapshotPausedListener(clusterService, repoName, indexName, 1); addUnassignedShardsWatcher(clusterService, indexName); updateIndexSettings(Settings.builder().putNull(REQUIRE_NODE_NAME_SETTING), indexName); @@ -146,7 +199,7 @@ public void testRemoveNodeAndFailoverMasterDuringSnapshot() throws Exception { final var clusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class); final var snapshotFuture = startFullSnapshotBlockedOnDataNode(randomIdentifier(), repoName, originalNode); - final var snapshotPausedListener = createSnapshotPausedListener(clusterService, repoName, indexName); + final var snapshotPausedListener = createSnapshotPausedListener(clusterService, repoName, indexName, 1); addUnassignedShardsWatcher(clusterService, indexName); final var snapshotStatusUpdateBarrier = new CyclicBarrier(2); @@ -264,7 +317,7 @@ public void testRemoveNodeDuringSnapshotWithOtherRunningShardSnapshots() throws final var clusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class); final var snapshotFuture = startFullSnapshotBlockedOnDataNode(randomIdentifier(), repoName, nodeForRemoval); - final var snapshotPausedListener = createSnapshotPausedListener(clusterService, repoName, indexName); + final var snapshotPausedListener = createSnapshotPausedListener(clusterService, repoName, indexName, 1); addUnassignedShardsWatcher(clusterService, indexName); waitForBlock(otherNode, repoName); @@ -320,7 +373,7 @@ public void testStartRemoveNodeButDoNotComplete() throws Exception { final var clusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class); final var snapshotFuture = startFullSnapshotBlockedOnDataNode(randomIdentifier(), repoName, primaryNode); - final var snapshotPausedListener = createSnapshotPausedListener(clusterService, repoName, indexName); + final var snapshotPausedListener = createSnapshotPausedListener(clusterService, repoName, indexName, 1); addUnassignedShardsWatcher(clusterService, indexName); putShutdownForRemovalMetadata(primaryNode, clusterService); @@ -334,6 +387,9 @@ public void testStartRemoveNodeButDoNotComplete() throws Exception { assertEquals(SnapshotState.SUCCESS, snapshotFuture.get(10, TimeUnit.SECONDS).getSnapshotInfo().state()); } + /** + * Tests that deleting a snapshot will abort paused shard snapshots on a node with shutdown metadata. + */ public void testAbortSnapshotWhileRemovingNode() throws Exception { final var primaryNode = internalCluster().startDataOnlyNode(); final var indexName = randomIdentifier(); @@ -363,7 +419,7 @@ public void testAbortSnapshotWhileRemovingNode() throws Exception { final var clusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class); addUnassignedShardsWatcher(clusterService, indexName); putShutdownForRemovalMetadata(primaryNode, clusterService); - unblockAllDataNodes(repoName); // lets the shard snapshot abort, but allocation filtering stops it from moving + unblockAllDataNodes(repoName); // lets the shard snapshot pause, but allocation filtering stops it from moving safeAwait(updateSnapshotStatusBarrier); // wait for data node to notify master that the shard snapshot is paused // abort snapshot (and wait for the abort to land in the cluster state) @@ -414,10 +470,180 @@ public void testShutdownWhileSuccessInFlight() throws Exception { clearShutdownMetadata(clusterService); } + /** + * This test exercises the SnapshotShutdownProgressTracker's log messages reporting the progress of shard snapshots on data nodes. + */ + @TestLogging( + value = "org.elasticsearch.snapshots.SnapshotShutdownProgressTracker:TRACE", + reason = "Testing SnapshotShutdownProgressTracker's progress, which is reported at the TRACE logging level" + ) + public void testSnapshotShutdownProgressTracker() throws Exception { + final var repoName = randomIdentifier(); + final int numShards = randomIntBetween(1, 10); + createRepository(repoName, "mock"); + + // Create another index on another node which will be blocked (remain in state INIT) throughout. + // Not required for this test, just adds some more concurrency. + final var otherNode = internalCluster().startDataOnlyNode(); + final var otherIndex = randomIdentifier(); + createIndexWithContent(otherIndex, indexSettings(numShards, 0).put(REQUIRE_NODE_NAME_SETTING, otherNode).build()); + blockDataNode(repoName, otherNode); + + final var nodeForRemoval = internalCluster().startDataOnlyNode( + // Speed up the logging frequency, so that the test doesn't have to wait too long to check for log messages. + Settings.builder().put(SNAPSHOT_PROGRESS_DURING_SHUTDOWN_LOG_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(200)).build() + ); + final String nodeForRemovalId = internalCluster().getInstance(NodeEnvironment.class, nodeForRemoval).nodeId(); + final var indexName = randomIdentifier(); + createIndexWithContent(indexName, indexSettings(numShards, 0).put(REQUIRE_NODE_NAME_SETTING, nodeForRemoval).build()); + indexAllShardsToAnEqualOrGreaterMinimumSize(indexName, new ByteSizeValue(2, ByteSizeUnit.KB).getBytes()); + + // Start the snapshot with blocking in place on the data node not to allow shard snapshots to finish yet. + final var clusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class); + final var snapshotFuture = startFullSnapshotBlockedOnDataNode(randomIdentifier(), repoName, nodeForRemoval); + final var snapshotPausedListener = createSnapshotPausedListener(clusterService, repoName, indexName, numShards); + addUnassignedShardsWatcher(clusterService, indexName); + + waitForBlock(otherNode, repoName); + + logger.info("---> nodeForRemovalId: " + nodeForRemovalId + ", numShards: " + numShards); + mockLog.addExpectation( + new MockLog.SeenEventExpectation( + "SnapshotShutdownProgressTracker start log message", + SnapshotShutdownProgressTracker.class.getCanonicalName(), + Level.DEBUG, + "Starting shutdown snapshot progress logging on node [" + nodeForRemovalId + "]" + ) + ); + mockLog.addExpectation( + new MockLog.SeenEventExpectation( + "SnapshotShutdownProgressTracker pause set log message", + SnapshotShutdownProgressTracker.class.getCanonicalName(), + Level.DEBUG, + "Pause signals have been set for all shard snapshots on data node [" + nodeForRemovalId + "]" + ) + ); + + putShutdownForRemovalMetadata(nodeForRemoval, clusterService); + + // Check that the SnapshotShutdownProgressTracker was turned on after the shutdown metadata is set above. + mockLog.awaitAllExpectationsMatched(); + resetMockLog(); + + mockLog.addExpectation( + new MockLog.SeenEventExpectation( + "SnapshotShutdownProgressTracker running number of snapshots", + SnapshotShutdownProgressTracker.class.getCanonicalName(), + Level.INFO, + "*Number shard snapshots running [" + numShards + "].*" + ) + ); + + // Check that the SnapshotShutdownProgressTracker is tracking the active (not yet paused) shard snapshots. + mockLog.awaitAllExpectationsMatched(); + resetMockLog(); + + // Block on the master when a shard snapshot request comes in, until we can verify that the Tracker saw the outgoing request. + final CountDownLatch snapshotStatusUpdateLatch = new CountDownLatch(1); + final var masterTransportService = MockTransportService.getInstance(internalCluster().getMasterName()); + masterTransportService.addRequestHandlingBehavior( + SnapshotsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME, + (handler, request, channel, task) -> masterTransportService.getThreadPool().generic().execute(() -> { + safeAwait(snapshotStatusUpdateLatch); + try { + handler.messageReceived(request, channel, task); + } catch (Exception e) { + fail(e); + } + }) + ); + + mockLog.addExpectation( + new MockLog.SeenEventExpectation( + "SnapshotShutdownProgressTracker shard snapshot has paused log message", + SnapshotShutdownProgressTracker.class.getCanonicalName(), + Level.INFO, + "*Number shard snapshots waiting for master node reply to status update request [" + numShards + "]*" + ) + ); + + // Let the shard snapshot proceed. It will still get stuck waiting for the master node to respond. + unblockNode(repoName, nodeForRemoval); + + // Check that the SnapshotShutdownProgressTracker observed the request sent to the master node. + mockLog.awaitAllExpectationsMatched(); + resetMockLog(); + + mockLog.addExpectation( + new MockLog.SeenEventExpectation( + "SnapshotShutdownProgressTracker shard snapshot has paused log message", + SnapshotShutdownProgressTracker.class.getCanonicalName(), + Level.INFO, + "Current active shard snapshot stats on data node [" + nodeForRemovalId + "]*Paused [" + numShards + "]" + ) + ); + + // Release the master node to respond + snapshotStatusUpdateLatch.countDown(); + + // Wait for the snapshot to fully pause. + safeAwait(snapshotPausedListener); + + // Check that the SnapshotShutdownProgressTracker observed the shard snapshot finishing as paused. + mockLog.awaitAllExpectationsMatched(); + resetMockLog(); + + // Remove the allocation filter so that the shard moves off of the node shutting down. + updateIndexSettings(Settings.builder().putNull(REQUIRE_NODE_NAME_SETTING), indexName); + + // Wait for the shard snapshot to succeed on the non-shutting down node. + safeAwait( + ClusterServiceUtils.addTemporaryStateListener( + clusterService, + state -> SnapshotsInProgress.get(state) + .asStream() + .allMatch( + e -> e.shards() + .entrySet() + .stream() + .anyMatch( + shardEntry -> shardEntry.getKey().getIndexName().equals(indexName) + && switch (shardEntry.getValue().state()) { + case INIT, PAUSED_FOR_NODE_REMOVAL -> false; + case SUCCESS -> true; + case FAILED, ABORTED, MISSING, QUEUED, WAITING -> throw new AssertionError(shardEntry.toString()); + } + ) + ) + ) + ); + + unblockAllDataNodes(repoName); + + // Snapshot completes when the node vacates even though it hasn't been removed yet + assertEquals(SnapshotState.SUCCESS, snapshotFuture.get(10, TimeUnit.SECONDS).getSnapshotInfo().state()); + + mockLog.addExpectation( + new MockLog.SeenEventExpectation( + "SnapshotShutdownProgressTracker cancelled log message", + SnapshotShutdownProgressTracker.class.getCanonicalName(), + Level.DEBUG, + "Cancelling shutdown snapshot progress logging on node [" + nodeForRemovalId + "]" + ) + ); + + clearShutdownMetadata(clusterService); + + // Check that the SnapshotShutdownProgressTracker logging was cancelled by the removal of the shutdown metadata. + mockLog.awaitAllExpectationsMatched(); + resetMockLog(); + } + private static SubscribableListener createSnapshotPausedListener( ClusterService clusterService, String repoName, - String indexName + String indexName, + int numShards ) { return ClusterServiceUtils.addTemporaryStateListener(clusterService, state -> { final var entriesForRepo = SnapshotsInProgress.get(state).forRepo(repoName); @@ -434,10 +660,17 @@ private static SubscribableListener createSnapshotPausedListener( .stream() .flatMap(e -> e.getKey().getIndexName().equals(indexName) ? Stream.of(e.getValue()) : Stream.of()) .toList(); - assertThat(shardSnapshotStatuses, hasSize(1)); - final var shardState = shardSnapshotStatuses.iterator().next().state(); - assertThat(shardState, oneOf(SnapshotsInProgress.ShardState.INIT, SnapshotsInProgress.ShardState.PAUSED_FOR_NODE_REMOVAL)); - return shardState == SnapshotsInProgress.ShardState.PAUSED_FOR_NODE_REMOVAL; + assertThat(shardSnapshotStatuses, hasSize(numShards)); + for (var shardStatus : shardSnapshotStatuses) { + assertThat( + shardStatus.state(), + oneOf(SnapshotsInProgress.ShardState.INIT, SnapshotsInProgress.ShardState.PAUSED_FOR_NODE_REMOVAL) + ); + if (shardStatus.state() == SnapshotsInProgress.ShardState.INIT) { + return false; + } + } + return true; }); } diff --git a/server/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java b/server/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java index a7ae17c8dac14..9477f9c6a5cc1 100644 --- a/server/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java +++ b/server/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java @@ -628,6 +628,11 @@ public List addedNodes() { return added; } + @Override + public String toString() { + return shortSummary(); + } + public String shortSummary() { final StringBuilder summary = new StringBuilder(); if (masterNodeChanged()) { diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index fbce913dac139..69be30b0b5111 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -122,6 +122,7 @@ import org.elasticsearch.search.fetch.subphase.highlight.FastVectorHighlighter; import org.elasticsearch.snapshots.InternalSnapshotsInfoService; import org.elasticsearch.snapshots.RestoreService; +import org.elasticsearch.snapshots.SnapshotShutdownProgressTracker; import org.elasticsearch.snapshots.SnapshotsService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ProxyConnectionStrategy; @@ -368,6 +369,7 @@ public void apply(Settings value, Settings current, Settings previous) { SniffConnectionStrategy.REMOTE_NODE_CONNECTIONS, TransportCloseIndexAction.CLUSTER_INDICES_CLOSE_ENABLE_SETTING, ShardsLimitAllocationDecider.CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING, + SnapshotShutdownProgressTracker.SNAPSHOT_PROGRESS_DURING_SHUTDOWN_LOG_INTERVAL_SETTING, NodeConnectionsService.CLUSTER_NODE_RECONNECT_INTERVAL_SETTING, HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_TYPE_SETTING, HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_TYPE_SETTING, diff --git a/server/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotStatus.java b/server/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotStatus.java index b5607c31641d3..70ba9950f7689 100644 --- a/server/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotStatus.java +++ b/server/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotStatus.java @@ -184,6 +184,10 @@ public synchronized void moveToDone(final long endTime, final ShardSnapshotResul } } + public Stage getStage() { + return stage.get(); + } + public void addAbortListener(ActionListener listener) { abortListeners.addListener(listener); } @@ -429,4 +433,31 @@ public String toString() { + ')'; } } + + @Override + public String toString() { + return "index shard snapshot status (" + + "stage=" + + stage + + ", startTime=" + + startTime + + ", totalTime=" + + totalTime + + ", incrementalFileCount=" + + incrementalFileCount + + ", totalFileCount=" + + totalFileCount + + ", processedFileCount=" + + processedFileCount + + ", incrementalSize=" + + incrementalSize + + ", totalSize=" + + totalSize + + ", processedSize=" + + processedSize + + ", failure='" + + failure + + '\'' + + ')'; + } } diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index abc5f36eef7da..7b2066f243771 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -21,6 +21,8 @@ import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus; import org.elasticsearch.cluster.SnapshotsInProgress.ShardState; +import org.elasticsearch.cluster.metadata.NodesShutdownMetadata; +import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; @@ -82,6 +84,8 @@ public final class SnapshotShardsService extends AbstractLifecycleComponent impl private final ThreadPool threadPool; + private final SnapshotShutdownProgressTracker snapshotShutdownProgressTracker; + private final Map> shardSnapshots = new HashMap<>(); // A map of snapshots to the shardIds that we already reported to the master as failed @@ -102,6 +106,11 @@ public SnapshotShardsService( this.transportService = transportService; this.clusterService = clusterService; this.threadPool = transportService.getThreadPool(); + this.snapshotShutdownProgressTracker = new SnapshotShutdownProgressTracker( + () -> clusterService.state().nodes().getLocalNodeId(), + clusterService.getClusterSettings(), + threadPool + ); this.remoteFailedRequestDeduplicator = new ResultDeduplicator<>(threadPool.getThreadContext()); if (DiscoveryNode.canContainData(settings)) { // this is only useful on the nodes that can hold data @@ -130,11 +139,38 @@ protected void doClose() { @Override public void clusterChanged(ClusterChangedEvent event) { try { + final var localNodeId = clusterService.localNode().getId(); + + // Track when this node enters and leaves shutdown mode because we pause shard snapshots for shutdown. + // The snapshotShutdownProgressTracker will report (via logging) on the progress shard snapshots make + // towards either completing (successfully or otherwise) or pausing. + NodesShutdownMetadata currentShutdownMetadata = event.state().metadata().custom(NodesShutdownMetadata.TYPE); + NodesShutdownMetadata previousShutdownMetadata = event.previousState().metadata().custom(NodesShutdownMetadata.TYPE); + SingleNodeShutdownMetadata currentLocalNodeShutdownMetadata = currentShutdownMetadata != null + ? currentShutdownMetadata.get(localNodeId) + : null; + SingleNodeShutdownMetadata previousLocalNodeShutdownMetadata = previousShutdownMetadata != null + ? previousShutdownMetadata.get(localNodeId) + : null; + + boolean isLocalNodeAddingShutdown = false; + if (isPausingProgressTrackedShutdown(previousLocalNodeShutdownMetadata) == false + && isPausingProgressTrackedShutdown(currentLocalNodeShutdownMetadata)) { + snapshotShutdownProgressTracker.onClusterStateAddShutdown(); + isLocalNodeAddingShutdown = true; + } else if (isPausingProgressTrackedShutdown(previousLocalNodeShutdownMetadata) + && isPausingProgressTrackedShutdown(currentLocalNodeShutdownMetadata) == false) { + snapshotShutdownProgressTracker.onClusterStateRemoveShutdown(); + } + final var currentSnapshots = SnapshotsInProgress.get(event.state()); + if (SnapshotsInProgress.get(event.previousState()).equals(currentSnapshots) == false) { - final var localNodeId = clusterService.localNode().getId(); synchronized (shardSnapshots) { + // Cancel any snapshots that have been removed from the cluster state. cancelRemoved(currentSnapshots); + + // Update running snapshots or start any snapshots that are set to run. for (final var oneRepoSnapshotsInProgress : currentSnapshots.entriesByRepo()) { for (final var snapshotsInProgressEntry : oneRepoSnapshotsInProgress) { handleUpdatedSnapshotsInProgressEntry( @@ -147,6 +183,11 @@ public void clusterChanged(ClusterChangedEvent event) { } } + if (isLocalNodeAddingShutdown) { + // Any active snapshots would have been signalled to pause in the previous code block. + snapshotShutdownProgressTracker.onClusterStatePausingSetForAllShardSnapshots(); + } + String previousMasterNodeId = event.previousState().nodes().getMasterNodeId(); String currentMasterNodeId = event.state().nodes().getMasterNodeId(); if (currentMasterNodeId != null && currentMasterNodeId.equals(previousMasterNodeId) == false) { @@ -164,6 +205,17 @@ public void clusterChanged(ClusterChangedEvent event) { } } + /** + * Determines whether we want to track this kind of shutdown for snapshot pausing progress. + * We want tracking is shutdown metadata is set, and not type RESTART. + * Note that the Shutdown API is idempotent and the type of shutdown may change to / from RESTART to / from some other type of interest. + * + * @return true if snapshots will be paused during this type of local node shutdown. + */ + private static boolean isPausingProgressTrackedShutdown(@Nullable SingleNodeShutdownMetadata localNodeShutdownMetadata) { + return localNodeShutdownMetadata != null && localNodeShutdownMetadata.getType() != SingleNodeShutdownMetadata.Type.RESTART; + } + @Override public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) { // abort any snapshots occurring on the soon-to-be closed shard @@ -231,6 +283,9 @@ private void cancelRemoved(SnapshotsInProgress snapshotsInProgress) { } } + /** + * Starts new snapshots and pauses or aborts active shard snapshot based on the updated {@link SnapshotsInProgress} entry. + */ private void handleUpdatedSnapshotsInProgressEntry(String localNodeId, boolean removingLocalNode, SnapshotsInProgress.Entry entry) { if (entry.isClone()) { // This is a snapshot clone, it will be executed on the current master @@ -364,8 +419,7 @@ private Runnable newShardSnapshotTask( final IndexVersion entryVersion, final long entryStartTime ) { - // separate method to make sure this lambda doesn't capture any heavy local objects like a SnapshotsInProgress.Entry - return () -> snapshot(shardId, snapshot, indexId, snapshotStatus, entryVersion, entryStartTime, new ActionListener<>() { + ActionListener snapshotResultListener = new ActionListener<>() { @Override public void onResponse(ShardSnapshotResult shardSnapshotResult) { final ShardGeneration newGeneration = shardSnapshotResult.getGeneration(); @@ -405,7 +459,15 @@ public void onFailure(Exception e) { final var shardState = snapshotStatus.moveToUnsuccessful(nextStage, failure, threadPool.absoluteTimeInMillis()); notifyUnsuccessfulSnapshotShard(snapshot, shardId, shardState, failure, snapshotStatus.generation()); } + }; + + snapshotShutdownProgressTracker.incNumberOfShardSnapshotsInProgress(shardId, snapshot); + var decTrackerRunsBeforeResultListener = ActionListener.runAfter(snapshotResultListener, () -> { + snapshotShutdownProgressTracker.decNumberOfShardSnapshotsInProgress(shardId, snapshot, snapshotStatus); }); + + // separate method to make sure this lambda doesn't capture any heavy local objects like a SnapshotsInProgress.Entry + return () -> snapshot(shardId, snapshot, indexId, snapshotStatus, entryVersion, entryStartTime, decTrackerRunsBeforeResultListener); } // package private for testing @@ -665,19 +727,25 @@ private void notifyUnsuccessfulSnapshotShard( /** Updates the shard snapshot status by sending a {@link UpdateIndexShardSnapshotStatusRequest} to the master node */ private void sendSnapshotShardUpdate(final Snapshot snapshot, final ShardId shardId, final ShardSnapshotStatus status) { + ActionListener updateResultListener = new ActionListener<>() { + @Override + public void onResponse(Void aVoid) { + logger.trace("[{}][{}] updated snapshot state to [{}]", shardId, snapshot, status); + } + + @Override + public void onFailure(Exception e) { + logger.warn(() -> format("[%s][%s] failed to update snapshot state to [%s]", shardId, snapshot, status), e); + } + }; + snapshotShutdownProgressTracker.trackRequestSentToMaster(snapshot, shardId); + var releaseTrackerRequestRunsBeforeResultListener = ActionListener.runBefore(updateResultListener, () -> { + snapshotShutdownProgressTracker.releaseRequestSentToMaster(snapshot, shardId); + }); + remoteFailedRequestDeduplicator.executeOnce( new UpdateIndexShardSnapshotStatusRequest(snapshot, shardId, status), - new ActionListener<>() { - @Override - public void onResponse(Void aVoid) { - logger.trace("[{}][{}] updated snapshot state to [{}]", shardId, snapshot, status); - } - - @Override - public void onFailure(Exception e) { - logger.warn(() -> format("[%s][%s] failed to update snapshot state to [%s]", shardId, snapshot, status), e); - } - }, + releaseTrackerRequestRunsBeforeResultListener, (req, reqListener) -> transportService.sendRequest( transportService.getLocalNode(), SnapshotsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME, diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShutdownProgressTracker.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShutdownProgressTracker.java new file mode 100644 index 0000000000000..5d81e3c4e46af --- /dev/null +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShutdownProgressTracker.java @@ -0,0 +1,270 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.snapshots; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ResultDeduplicator; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; +import org.elasticsearch.threadpool.Scheduler; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; + +/** + * Tracks progress of shard snapshots during shutdown, on this single data node. Periodically reports progress via logging, the interval for + * which see {@link #SNAPSHOT_PROGRESS_DURING_SHUTDOWN_LOG_INTERVAL_SETTING}. + */ +public class SnapshotShutdownProgressTracker { + + /** How frequently shard snapshot progress is logged after receiving local node shutdown metadata. */ + public static final Setting SNAPSHOT_PROGRESS_DURING_SHUTDOWN_LOG_INTERVAL_SETTING = Setting.timeSetting( + "snapshots.shutdown.progress.interval", + TimeValue.timeValueSeconds(5), + TimeValue.MINUS_ONE, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + private static final Logger logger = LogManager.getLogger(SnapshotShutdownProgressTracker.class); + + private final Supplier getLocalNodeId; + private final ThreadPool threadPool; + + private volatile TimeValue progressLoggerInterval; + private Scheduler.Cancellable scheduledProgressLoggerFuture; + + /** + * The time at which the cluster state update began that found a shutdown signal for this node. Negative value means unset (node is not + * shutting down). + */ + private volatile long shutdownStartMillis = -1; + + /** + * The time at which the cluster state finished setting shard snapshot states to PAUSING, which the shard snapshot operations will + * discover asynchronously. Negative value means unset (node is not shutting down) + */ + private volatile long shutdownFinishedSignallingPausingMillis = -1; + + /** + * Tracks the number of shard snapshots that have started on the data node but not yet finished. + */ + private final AtomicLong numberOfShardSnapshotsInProgressOnDataNode = new AtomicLong(); + + /** + * The logic to track shard snapshot status update requests to master can result in duplicate requests (see + * {@link ResultDeduplicator}), as well as resending requests if the elected master changes. + * Tracking specific requests uniquely by snapshot ID + shard ID de-duplicates requests for tracking. + * Also tracks the absolute start time of registration, to report duration on de-registration. + */ + private final Map shardSnapshotRequests = ConcurrentCollections.newConcurrentMap(); + + /** + * Track how the shard snapshots reach completion during shutdown: did they fail, succeed or pause? + */ + private final AtomicLong doneCount = new AtomicLong(); + private final AtomicLong failureCount = new AtomicLong(); + private final AtomicLong abortedCount = new AtomicLong(); + private final AtomicLong pausedCount = new AtomicLong(); + + public SnapshotShutdownProgressTracker(Supplier localNodeIdSupplier, ClusterSettings clusterSettings, ThreadPool threadPool) { + this.getLocalNodeId = localNodeIdSupplier; + clusterSettings.initializeAndWatch( + SNAPSHOT_PROGRESS_DURING_SHUTDOWN_LOG_INTERVAL_SETTING, + value -> this.progressLoggerInterval = value + ); + this.threadPool = threadPool; + } + + private void scheduleProgressLogger() { + if (progressLoggerInterval.millis() > 0) { + scheduledProgressLoggerFuture = threadPool.scheduleWithFixedDelay( + this::logProgressReport, + progressLoggerInterval, + threadPool.executor(ThreadPool.Names.GENERIC) + ); + logger.debug( + () -> Strings.format( + "Starting shutdown snapshot progress logging on node [%s], runs every [%s]", + getLocalNodeId.get(), + progressLoggerInterval + ) + ); + } else { + logger.debug("Snapshot progress logging during shutdown is disabled"); + } + } + + private void cancelProgressLogger() { + assert scheduledProgressLoggerFuture != null : "Somehow shutdown mode was removed before it was added."; + scheduledProgressLoggerFuture.cancel(); + if (progressLoggerInterval.millis() > 0) { + // Only log cancellation if it was most likely started. Theoretically the interval setting could be updated during shutdown, + // such that the progress logger is already running and ignores the new value, but that does not currently happen. + logger.debug(() -> Strings.format("Cancelling shutdown snapshot progress logging on node [%s]", getLocalNodeId.get())); + } + } + + /** + * Logs some statistics about shard snapshot progress. + */ + private void logProgressReport() { + logger.info( + """ + Current active shard snapshot stats on data node [{}]. \ + Node shutdown cluster state update received at [{}]. \ + Finished signalling shard snapshots to pause at [{}]. \ + Number shard snapshots running [{}]. \ + Number shard snapshots waiting for master node reply to status update request [{}] \ + Shard snapshot completion stats since shutdown began: Done [{}]; Failed [{}]; Aborted [{}]; Paused [{}]\ + """, + getLocalNodeId.get(), + shutdownStartMillis, + shutdownFinishedSignallingPausingMillis, + numberOfShardSnapshotsInProgressOnDataNode.get(), + shardSnapshotRequests.size(), + doneCount.get(), + failureCount.get(), + abortedCount.get(), + pausedCount.get() + ); + } + + /** + * Called as soon as a node shutdown signal is received. + */ + public void onClusterStateAddShutdown() { + assert this.shutdownStartMillis == -1 : "Expected not to be tracking anything. Call shutdown remove before adding shutdown again"; + + // Reset these values when a new shutdown occurs, to minimize/eliminate chances of racing if shutdown is later removed and async + // shard snapshots updates continue to occur. + doneCount.set(0); + failureCount.set(0); + abortedCount.set(0); + pausedCount.set(0); + + // Track the timestamp of shutdown signal, on which to base periodic progress logging. + this.shutdownStartMillis = threadPool.relativeTimeInMillis(); + + // Start logging periodic progress reports. + scheduleProgressLogger(); + } + + /** + * Called when the cluster state update processing a shutdown signal has finished signalling (setting PAUSING) all shard snapshots to + * pause. + */ + public void onClusterStatePausingSetForAllShardSnapshots() { + assert this.shutdownStartMillis != -1 + : "Should not have left shutdown mode before finishing processing the cluster state update with shutdown"; + this.shutdownFinishedSignallingPausingMillis = threadPool.relativeTimeInMillis(); + logger.debug(() -> Strings.format("Pause signals have been set for all shard snapshots on data node [%s]", getLocalNodeId.get())); + } + + /** + * The cluster state indicating that a node is to be shutdown may be cleared instead of following through with node shutdown. In that + * case, no further shutdown shard snapshot progress reporting is desired. + */ + public void onClusterStateRemoveShutdown() { + assert shutdownStartMillis != -1 : "Expected a call to add shutdown mode before a call to remove shutdown mode."; + + // Reset the shutdown specific trackers. + this.shutdownStartMillis = -1; + this.shutdownFinishedSignallingPausingMillis = -1; + + // Turn off the progress logger, which we only want to run during shutdown. + cancelProgressLogger(); + } + + /** + * Tracks how many shard snapshots are started. + */ + public void incNumberOfShardSnapshotsInProgress(ShardId shardId, Snapshot snapshot) { + logger.debug(() -> Strings.format("Started shard (shard ID: [%s]) in snapshot ([%s])", shardId, snapshot)); + numberOfShardSnapshotsInProgressOnDataNode.incrementAndGet(); + } + + /** + * Tracks how many shard snapshots have finished since shutdown mode began. + */ + public void decNumberOfShardSnapshotsInProgress(ShardId shardId, Snapshot snapshot, IndexShardSnapshotStatus shardSnapshotStatus) { + logger.debug( + () -> Strings.format( + "Finished shard (shard ID: [%s]) in snapshot ([%s]) with status ([%s]): ", + shardId, + snapshot, + shardSnapshotStatus.toString() + ) + ); + + numberOfShardSnapshotsInProgressOnDataNode.decrementAndGet(); + if (shutdownStartMillis != -1) { + switch (shardSnapshotStatus.getStage()) { + case DONE -> doneCount.incrementAndGet(); + case FAILURE -> failureCount.incrementAndGet(); + case ABORTED -> abortedCount.incrementAndGet(); + case PAUSED -> pausedCount.incrementAndGet(); + // The other stages are active, we should only see the end result because this method is called upon completion. + default -> { + assert false : "unexpected shard snapshot stage transition during shutdown: " + shardSnapshotStatus.getStage(); + } + } + } + } + + /** + * Uniquely tracks a request to update a shard snapshot status sent to the master node. Idempotent, safe to call multiple times. + * + * @param snapshot first part of a unique tracking identifier + * @param shardId second part of a unique tracking identifier + */ + public void trackRequestSentToMaster(Snapshot snapshot, ShardId shardId) { + logger.debug(() -> Strings.format("Tracking shard (shard ID: [%s]) snapshot ([%s]) request to master", shardId, snapshot)); + shardSnapshotRequests.put(snapshot.toString() + shardId.getIndexName() + shardId.getId(), threadPool.relativeTimeInNanos()); + } + + /** + * Stops tracking a request to update a shard snapshot status sent to the master node. Idempotent, safe to call multiple times. + * + * @param snapshot first part of a unique tracking identifier + * @param shardId second part of a unique tracking identifier + */ + public void releaseRequestSentToMaster(Snapshot snapshot, ShardId shardId) { + var masterRequestStartTime = shardSnapshotRequests.remove(snapshot.toString() + shardId.getIndexName() + shardId.getId()); + // This method is may be called multiple times. Only log if this is the first time, and the entry hasn't already been removed. + if (masterRequestStartTime != null) { + logger.debug( + () -> Strings.format( + "Finished shard (shard ID: [%s]) snapshot ([%s]) update request to master in [%s]", + shardId, + snapshot, + new TimeValue(threadPool.relativeTimeInNanos() - masterRequestStartTime.longValue(), TimeUnit.NANOSECONDS) + ) + ); + } + } + + // Test only + void assertStatsForTesting(long done, long failure, long aborted, long paused) { + assert doneCount.get() == done : "doneCount is " + doneCount.get() + ", expected count was " + done; + assert failureCount.get() == failure : "failureCount is " + doneCount.get() + ", expected count was " + failure; + assert abortedCount.get() == aborted : "abortedCount is " + doneCount.get() + ", expected count was " + aborted; + assert pausedCount.get() == paused : "pausedCount is " + doneCount.get() + ", expected count was " + paused; + } +} diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotShutdownProgressTrackerTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotShutdownProgressTrackerTests.java new file mode 100644 index 0000000000000..fbf742ae2ea57 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotShutdownProgressTrackerTests.java @@ -0,0 +1,407 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.snapshots; + +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.cluster.coordination.Coordinator; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; +import org.elasticsearch.repositories.ShardGeneration; +import org.elasticsearch.repositories.ShardSnapshotResult; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.MockLog; +import org.elasticsearch.test.junit.annotations.TestLogging; +import org.elasticsearch.threadpool.ThreadPool; +import org.junit.Before; + +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; + +public class SnapshotShutdownProgressTrackerTests extends ESTestCase { + private static final Logger logger = LogManager.getLogger(SnapshotShutdownProgressTrackerTests.class); + + final Settings settings = Settings.builder() + .put( + SnapshotShutdownProgressTracker.SNAPSHOT_PROGRESS_DURING_SHUTDOWN_LOG_INTERVAL_SETTING.getKey(), + TimeValue.timeValueMillis(500) + ) + .build(); + final Settings disabledTrackerLoggingSettings = Settings.builder() + .put(SnapshotShutdownProgressTracker.SNAPSHOT_PROGRESS_DURING_SHUTDOWN_LOG_INTERVAL_SETTING.getKey(), TimeValue.MINUS_ONE) + .build(); + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + + DeterministicTaskQueue deterministicTaskQueue; + + // Construction parameters for the Tracker. + ThreadPool testThreadPool; + private final Supplier getLocalNodeIdSupplier = () -> "local-node-id-for-test"; + private final BiConsumer, Consumer> addSettingsUpdateConsumerNoOp = (setting, updateMethod) -> {}; + + // Set up some dummy shard snapshot information to feed the Tracker. + private final ShardId dummyShardId = new ShardId(new Index("index-name-for-test", "index-uuid-for-test"), 0); + private final Snapshot dummySnapshot = new Snapshot( + "snapshot-repo-name-for-test", + new SnapshotId("snapshot-name-for-test", "snapshot-uuid-for-test") + ); + Function dummyShardSnapshotStatusSupplier = (stage) -> { + var shardGen = new ShardGeneration("shard-gen-string-for-test"); + IndexShardSnapshotStatus newStatus = IndexShardSnapshotStatus.newInitializing(new ShardGeneration("shard-gen-string-for-test")); + switch (stage) { + case DONE -> { + newStatus.moveToStarted(0L, 1, 10, 2L, 20L); + newStatus.moveToFinalize(); + newStatus.moveToDone(10L, new ShardSnapshotResult(shardGen, ByteSizeValue.MINUS_ONE, 2)); + } + case ABORTED -> newStatus.abortIfNotCompleted("snapshot-aborted-for-test", (listener) -> {}); + case FAILURE -> newStatus.moveToFailed(300, "shard-snapshot-failure-string for-test"); + case PAUSED -> { + newStatus.pauseIfNotCompleted((listener) -> {}); + newStatus.moveToUnsuccessful(IndexShardSnapshotStatus.Stage.PAUSED, "shard-paused-string-for-test", 100L); + } + default -> newStatus.pauseIfNotCompleted((listener) -> {}); + } + return newStatus; + }; + + @Before + public void setUpThreadPool() { + deterministicTaskQueue = new DeterministicTaskQueue(); + testThreadPool = deterministicTaskQueue.getThreadPool(); + } + + /** + * Increments the tracker's shard snapshot completion stats. Evenly adds to each type of {@link IndexShardSnapshotStatus.Stage} stat + * supported by the tracker. + */ + void simulateShardSnapshotsCompleting(SnapshotShutdownProgressTracker tracker, int numShardSnapshots) { + for (int i = 0; i < numShardSnapshots; ++i) { + tracker.incNumberOfShardSnapshotsInProgress(dummyShardId, dummySnapshot); + IndexShardSnapshotStatus status; + switch (i % 4) { + case 0 -> status = dummyShardSnapshotStatusSupplier.apply(IndexShardSnapshotStatus.Stage.DONE); + case 1 -> status = dummyShardSnapshotStatusSupplier.apply(IndexShardSnapshotStatus.Stage.ABORTED); + case 2 -> status = dummyShardSnapshotStatusSupplier.apply(IndexShardSnapshotStatus.Stage.FAILURE); + case 3 -> status = dummyShardSnapshotStatusSupplier.apply(IndexShardSnapshotStatus.Stage.PAUSED); + // decNumberOfShardSnapshotsInProgress will throw an assertion if this value is ever set. + default -> status = dummyShardSnapshotStatusSupplier.apply(IndexShardSnapshotStatus.Stage.PAUSING); + } + logger.info("---> Generated shard snapshot status in stage (" + status.getStage() + ") for switch case (" + (i % 4) + ")"); + tracker.decNumberOfShardSnapshotsInProgress(dummyShardId, dummySnapshot, status); + } + } + + public void testTrackerLogsStats() { + SnapshotShutdownProgressTracker tracker = new SnapshotShutdownProgressTracker( + getLocalNodeIdSupplier, + clusterSettings, + testThreadPool + ); + + try (var mockLog = MockLog.capture(Coordinator.class, SnapshotShutdownProgressTracker.class)) { + mockLog.addExpectation( + new MockLog.UnseenEventExpectation( + "unset shard snapshot completion stats", + SnapshotShutdownProgressTracker.class.getCanonicalName(), + Level.INFO, + "*snapshots to pause [-1]*Done [0]; Failed [0]; Aborted [0]; Paused [0]*" + ) + ); + + // Simulate starting shutdown -- should reset the completion stats and start logging + tracker.onClusterStateAddShutdown(); + + // Wait for the initial progress log message with no shard snapshot completions. + deterministicTaskQueue.advanceTime(); + deterministicTaskQueue.runAllRunnableTasks(); + mockLog.awaitAllExpectationsMatched(); + } + + try (var mockLog = MockLog.capture(Coordinator.class, SnapshotShutdownProgressTracker.class)) { + mockLog.addExpectation( + new MockLog.SeenEventExpectation( + "shard snapshot completed stats", + SnapshotShutdownProgressTracker.class.getCanonicalName(), + Level.INFO, + "*Shard snapshot completion stats since shutdown began: Done [2]; Failed [1]; Aborted [1]; Paused [1]*" + ) + ); + + // Simulate updating the shard snapshot completion stats. + simulateShardSnapshotsCompleting(tracker, 5); + tracker.assertStatsForTesting(2, 1, 1, 1); + + // Wait for the next periodic log message to include the new completion stats. + deterministicTaskQueue.advanceTime(); + deterministicTaskQueue.runAllRunnableTasks(); + mockLog.awaitAllExpectationsMatched(); + } + } + + /** + * Test that {@link SnapshotShutdownProgressTracker#SNAPSHOT_PROGRESS_DURING_SHUTDOWN_LOG_INTERVAL_SETTING} can be disabled by setting + * a value of {@link TimeValue#MINUS_ONE}. This will disable progress logging, though the Tracker will continue to track things. + */ + @TestLogging( + value = "org.elasticsearch.snapshots.SnapshotShutdownProgressTracker:DEBUG", + reason = "Test checks for DEBUG-level log message" + ) + public void testTrackerProgressLoggingIntervalSettingCanBeDisabled() { + ClusterSettings clusterSettingsDisabledLogging = new ClusterSettings( + disabledTrackerLoggingSettings, + ClusterSettings.BUILT_IN_CLUSTER_SETTINGS + ); + SnapshotShutdownProgressTracker tracker = new SnapshotShutdownProgressTracker( + getLocalNodeIdSupplier, + clusterSettingsDisabledLogging, + testThreadPool + ); + + try (var mockLog = MockLog.capture(Coordinator.class, SnapshotShutdownProgressTracker.class)) { + mockLog.addExpectation( + new MockLog.SeenEventExpectation( + "disabled logging message", + SnapshotShutdownProgressTracker.class.getName(), + Level.DEBUG, + "Snapshot progress logging during shutdown is disabled" + ) + ); + mockLog.addExpectation( + new MockLog.UnseenEventExpectation( + "no progress logging message", + SnapshotShutdownProgressTracker.class.getName(), + Level.INFO, + "Current active shard snapshot stats on data node*" + ) + ); + + // Simulate starting shutdown -- no logging will start because the Tracker logging is disabled. + tracker.onClusterStateAddShutdown(); + tracker.onClusterStatePausingSetForAllShardSnapshots(); + + // Wait for the logging disabled message. + deterministicTaskQueue.runAllTasks(); + mockLog.awaitAllExpectationsMatched(); + } + } + + @TestLogging( + value = "org.elasticsearch.snapshots.SnapshotShutdownProgressTracker:DEBUG", + reason = "Test checks for DEBUG-level log message" + ) + public void testTrackerIntervalSettingDynamically() { + ClusterSettings clusterSettingsDisabledLogging = new ClusterSettings( + disabledTrackerLoggingSettings, + ClusterSettings.BUILT_IN_CLUSTER_SETTINGS + ); + SnapshotShutdownProgressTracker tracker = new SnapshotShutdownProgressTracker( + getLocalNodeIdSupplier, + clusterSettingsDisabledLogging, + testThreadPool + ); + // Re-enable the progress logging + clusterSettingsDisabledLogging.applySettings(settings); + + // Check that the logging is active. + try (var mockLog = MockLog.capture(Coordinator.class, SnapshotShutdownProgressTracker.class)) { + mockLog.addExpectation( + new MockLog.UnseenEventExpectation( + "disabled logging message", + SnapshotShutdownProgressTracker.class.getName(), + Level.DEBUG, + "Snapshot progress logging during shutdown is disabled" + ) + ); + mockLog.addExpectation( + new MockLog.SeenEventExpectation( + "progress logging message", + SnapshotShutdownProgressTracker.class.getName(), + Level.INFO, + "Current active shard snapshot stats on data node*" + ) + ); + + // Simulate starting shutdown -- progress logging should begin. + tracker.onClusterStateAddShutdown(); + tracker.onClusterStatePausingSetForAllShardSnapshots(); + + // Wait for the progress logging message + deterministicTaskQueue.advanceTime(); + deterministicTaskQueue.runAllRunnableTasks(); + mockLog.awaitAllExpectationsMatched(); + } + } + + public void testTrackerPauseTimestamp() { + SnapshotShutdownProgressTracker tracker = new SnapshotShutdownProgressTracker( + getLocalNodeIdSupplier, + clusterSettings, + testThreadPool + ); + + try (var mockLog = MockLog.capture(Coordinator.class, SnapshotShutdownProgressTracker.class)) { + mockLog.addExpectation( + new MockLog.SeenEventExpectation( + "pausing timestamp should be set", + SnapshotShutdownProgressTracker.class.getName(), + Level.INFO, + "*Finished signalling shard snapshots to pause at [" + testThreadPool.relativeTimeInMillis() + "]*" + ) + ); + + // Simulate starting shutdown -- start logging. + tracker.onClusterStateAddShutdown(); + + // Set a pausing complete timestamp. + tracker.onClusterStatePausingSetForAllShardSnapshots(); + + // Wait for the first log message to ensure the pausing timestamp was set. + deterministicTaskQueue.advanceTime(); + deterministicTaskQueue.runAllRunnableTasks(); + mockLog.awaitAllExpectationsMatched(); + } + } + + public void testTrackerRequestsToMaster() { + SnapshotShutdownProgressTracker tracker = new SnapshotShutdownProgressTracker( + getLocalNodeIdSupplier, + clusterSettings, + testThreadPool + ); + Snapshot snapshot = new Snapshot("repositoryName", new SnapshotId("snapshotName", "snapshotUUID")); + ShardId shardId = new ShardId(new Index("indexName", "indexUUID"), 0); + + // Simulate starting shutdown -- start logging. + tracker.onClusterStateAddShutdown(); + + // Set a pausing complete timestamp. + tracker.onClusterStatePausingSetForAllShardSnapshots(); + + try (var mockLog = MockLog.capture(Coordinator.class, SnapshotShutdownProgressTracker.class)) { + mockLog.addExpectation( + new MockLog.SeenEventExpectation( + "one master status update request", + SnapshotShutdownProgressTracker.class.getName(), + Level.INFO, + "*master node reply to status update request [1]*" + ) + ); + + tracker.trackRequestSentToMaster(snapshot, shardId); + + // Wait for the first log message to ensure the pausing timestamp was set. + deterministicTaskQueue.advanceTime(); + deterministicTaskQueue.runAllRunnableTasks(); + mockLog.awaitAllExpectationsMatched(); + } + + try (var mockLog = MockLog.capture(Coordinator.class, SnapshotShutdownProgressTracker.class)) { + mockLog.addExpectation( + new MockLog.SeenEventExpectation( + "no master status update requests", + SnapshotShutdownProgressTracker.class.getName(), + Level.INFO, + "*master node reply to status update request [0]*" + ) + ); + + tracker.releaseRequestSentToMaster(snapshot, shardId); + + // Wait for the first log message to ensure the pausing timestamp was set. + deterministicTaskQueue.advanceTime(); + deterministicTaskQueue.runAllRunnableTasks(); + mockLog.awaitAllExpectationsMatched(); + } + } + + public void testTrackerClearShutdown() { + SnapshotShutdownProgressTracker tracker = new SnapshotShutdownProgressTracker( + getLocalNodeIdSupplier, + clusterSettings, + testThreadPool + ); + + try (var mockLog = MockLog.capture(Coordinator.class, SnapshotShutdownProgressTracker.class)) { + mockLog.addExpectation( + new MockLog.UnseenEventExpectation( + "pausing timestamp should be unset", + SnapshotShutdownProgressTracker.class.getName(), + Level.INFO, + "*Finished signalling shard snapshots to pause at [-1]*" + ) + ); + + // Simulate starting shutdown -- start logging. + tracker.onClusterStateAddShutdown(); + + // Set a pausing complete timestamp. + tracker.onClusterStatePausingSetForAllShardSnapshots(); + + // Wait for the first log message to ensure the pausing timestamp was set. + deterministicTaskQueue.advanceTime(); + deterministicTaskQueue.runAllRunnableTasks(); + mockLog.awaitAllExpectationsMatched(); + } + + try (var mockLog = MockLog.capture(Coordinator.class, SnapshotShutdownProgressTracker.class)) { + mockLog.addExpectation( + new MockLog.SeenEventExpectation( + "logging completed shard snapshot stats", + SnapshotShutdownProgressTracker.class.getName(), + Level.INFO, + "*Done [2]; Failed [2]; Aborted [2]; Paused [1]*" + ) + ); + + // Simulate updating the shard snapshot completion stats. + simulateShardSnapshotsCompleting(tracker, 7); + tracker.assertStatsForTesting(2, 2, 2, 1); + + // Wait for the first log message to ensure the pausing timestamp was set. + deterministicTaskQueue.advanceTime(); + deterministicTaskQueue.runAllRunnableTasks(); + mockLog.awaitAllExpectationsMatched(); + } + + // Clear start and pause timestamps + tracker.onClusterStateRemoveShutdown(); + + try (var mockLog = MockLog.capture(Coordinator.class, SnapshotShutdownProgressTracker.class)) { + mockLog.addExpectation( + new MockLog.SeenEventExpectation( + "completed shard snapshot stats are reset", + SnapshotShutdownProgressTracker.class.getName(), + Level.INFO, + "*Done [0]; Failed [0]; Aborted [0]; Paused [0]" + ) + ); + + // Start logging again and check that the pause timestamp was reset from the last time. + tracker.onClusterStateAddShutdown(); + + // Wait for the first log message to ensure the pausing timestamp was set. + deterministicTaskQueue.advanceTime(); + deterministicTaskQueue.runAllRunnableTasks(); + mockLog.awaitAllExpectationsMatched(); + } + + } +} diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index 5a40816c94beb..87a834d6424b7 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -45,6 +45,7 @@ import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse; import org.elasticsearch.action.admin.indices.segments.ShardSegments; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequestBuilder; +import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequestBuilder; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; @@ -1545,6 +1546,37 @@ protected final DocWriteResponse indexDoc(String index, String id, Object... sou return prepareIndex(index).setId(id).setSource(source).get(); } + /** + * Runs random indexing until each shard in the given index is at least minBytesPerShard in size. + * Force merges all cluster shards down to one segment, and then invokes refresh to ensure all shard data is visible for readers, + * before returning. + * + * @return The final {@link ShardStats} for all shards of the index. + */ + protected ShardStats[] indexAllShardsToAnEqualOrGreaterMinimumSize(final String indexName, long minBytesPerShard) { + while (true) { + indexRandom(false, indexName, scaledRandomIntBetween(100, 10000)); + forceMerge(); + refresh(); + + final ShardStats[] shardStats = indicesAdmin().prepareStats(indexName) + .clear() + .setStore(true) + .setTranslog(true) + .get() + .getShards(); + + var smallestShardSize = Arrays.stream(shardStats) + .mapToLong(it -> it.getStats().getStore().sizeInBytes()) + .min() + .orElseThrow(() -> new AssertionError("no shards")); + + if (smallestShardSize >= minBytesPerShard) { + return shardStats; + } + } + } + /** * Syntactic sugar for: *