From 8c088d50730e4d833343ab0e6b3f9e4c709aace7 Mon Sep 17 00:00:00 2001 From: Artem Prigoda Date: Tue, 10 Sep 2024 18:08:56 +0200 Subject: [PATCH] [test] Track index commits internally acquired by the commits listener in CombinedDeletionPolicy (#112507) After finishing an integration test, we run some checks against the test cluster, among others we assert there are no leaky acquired index commits left in `InternalTestCluster#beforeIndexDeletion`. The issue is that while we check the test cluster before we shut it down, we can't guarantee that there wouldn't be a new commit triggered by a background merge which will acquire an index commit. But we actually don't care about these commits acquired internally as part of `CombinedDeletionPolicy#commitsListener` callbacks. We just want to make sure that all index commits that have been acquired explicitly are also released. So, we make an explicit distinction between external and internal index commits that are tracked in `CombinedDeletionPolicy`. ES-8407 --- .../index/engine/CombinedDeletionPolicy.java | 43 +++++++++++++++---- .../index/engine/InternalEngine.java | 4 +- .../index/engine/EngineTestCase.java | 4 +- .../test/InternalTestCluster.java | 2 +- .../CcrRestoreSourceServiceTests.java | 4 +- 5 files changed, 42 insertions(+), 15 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java index 22bab1742589e..43b0c27d30580 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java @@ -43,6 +43,9 @@ public class CombinedDeletionPolicy extends IndexDeletionPolicy { private final SoftDeletesPolicy softDeletesPolicy; private final LongSupplier globalCheckpointSupplier; private final Map acquiredIndexCommits; // Number of references held against each commit point. + // Index commits internally acquired by the commits listener. We want to track them separately to be able to disregard them + // when checking for externally acquired index commits that haven't been released + private final Set internallyAcquiredIndexCommits; interface CommitsListener { @@ -72,6 +75,7 @@ interface CommitsListener { this.globalCheckpointSupplier = globalCheckpointSupplier; this.commitsListener = commitsListener; this.acquiredIndexCommits = new HashMap<>(); + this.internallyAcquiredIndexCommits = new HashSet<>(); } @Override @@ -114,7 +118,7 @@ public void onCommit(List commits) throws IOException { this.maxSeqNoOfNextSafeCommit = Long.parseLong(commits.get(keptPosition + 1).getUserData().get(SequenceNumbers.MAX_SEQ_NO)); } if (commitsListener != null && previousLastCommit != this.lastCommit) { - newCommit = acquireIndexCommit(false); + newCommit = acquireIndexCommit(false, true); } else { newCommit = null; } @@ -210,15 +214,25 @@ SafeCommitInfo getSafeCommitInfo() { * @param acquiringSafeCommit captures the most recent safe commit point if true; otherwise captures the most recent commit point. */ synchronized IndexCommit acquireIndexCommit(boolean acquiringSafeCommit) { + return acquireIndexCommit(acquiringSafeCommit, false); + } + + private synchronized IndexCommit acquireIndexCommit(boolean acquiringSafeCommit, boolean acquiredInternally) { assert safeCommit != null : "Safe commit is not initialized yet"; assert lastCommit != null : "Last commit is not initialized yet"; final IndexCommit snapshotting = acquiringSafeCommit ? safeCommit : lastCommit; acquiredIndexCommits.merge(snapshotting, 1, Integer::sum); // increase refCount - return wrapCommit(snapshotting); + assert acquiredInternally == false || internallyAcquiredIndexCommits.add(snapshotting) + : "commit [" + snapshotting + "] already added"; + return wrapCommit(snapshotting, acquiredInternally); } protected IndexCommit wrapCommit(IndexCommit indexCommit) { - return new SnapshotIndexCommit(indexCommit); + return wrapCommit(indexCommit, false); + } + + protected IndexCommit wrapCommit(IndexCommit indexCommit, boolean acquiredInternally) { + return new SnapshotIndexCommit(indexCommit, acquiredInternally); } /** @@ -227,7 +241,8 @@ protected IndexCommit wrapCommit(IndexCommit indexCommit) { * @return true if the acquired commit can be clean up. */ synchronized boolean releaseCommit(final IndexCommit acquiredCommit) { - final IndexCommit releasingCommit = ((SnapshotIndexCommit) acquiredCommit).getIndexCommit(); + final SnapshotIndexCommit snapshotIndexCommit = (SnapshotIndexCommit) acquiredCommit; + final IndexCommit releasingCommit = snapshotIndexCommit.getIndexCommit(); assert acquiredIndexCommits.containsKey(releasingCommit) : "Release non-acquired commit;" + "acquired commits [" @@ -242,6 +257,8 @@ synchronized boolean releaseCommit(final IndexCommit acquiredCommit) { } return count - 1; }); + assert snapshotIndexCommit.acquiredInternally == false || internallyAcquiredIndexCommits.remove(releasingCommit) + : "Trying to release a commit [" + releasingCommit + "] that hasn't been previously acquired internally"; assert refCount == null || refCount > 0 : "Number of references for acquired commit can not be negative [" + refCount + "]"; // The commit can be clean up only if no refCount and it is neither the safe commit nor last commit. @@ -296,10 +313,16 @@ private static Set listOfNewFileNames(IndexCommit previous, IndexCommit } /** - * Checks whether the deletion policy is holding on to acquired index commits + * Checks whether the deletion policy is holding on to externally acquired index commits */ - synchronized boolean hasAcquiredIndexCommits() { - return acquiredIndexCommits.isEmpty() == false; + synchronized boolean hasAcquiredIndexCommitsForTesting() { + // We explicitly check only external commits and disregard internal commits acquired by the commits listener + for (var e : acquiredIndexCommits.entrySet()) { + if (internallyAcquiredIndexCommits.contains(e.getKey()) == false || e.getValue() > 1) { + return true; + } + } + return false; } /** @@ -320,8 +343,12 @@ public static String commitDescription(IndexCommit commit) throws IOException { * A wrapper of an index commit that prevents it from being deleted. */ private static class SnapshotIndexCommit extends FilterIndexCommit { - SnapshotIndexCommit(IndexCommit delegate) { + + private final boolean acquiredInternally; + + SnapshotIndexCommit(IndexCommit delegate, boolean acquiredInternally) { super(delegate); + this.acquiredInternally = acquiredInternally; } @Override diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 9743ee977a8c4..7c456f55ac8ad 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -669,8 +669,8 @@ Translog getTranslog() { } // Package private for testing purposes only - boolean hasAcquiredIndexCommits() { - return combinedDeletionPolicy.hasAcquiredIndexCommits(); + boolean hasAcquiredIndexCommitsForTesting() { + return combinedDeletionPolicy.hasAcquiredIndexCommitsForTesting(); } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 8412e9e250885..5387108592b10 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -1440,10 +1440,10 @@ public static void waitForOpsToComplete(InternalEngine engine, long seqNo) throw assertBusy(() -> assertThat(engine.getLocalCheckpointTracker().getProcessedCheckpoint(), greaterThanOrEqualTo(seqNo))); } - public static boolean hasAcquiredIndexCommits(Engine engine) { + public static boolean hasAcquiredIndexCommitsForTesting(Engine engine) { assert engine instanceof InternalEngine : "only InternalEngines have snapshotted commits, got: " + engine.getClass(); InternalEngine internalEngine = (InternalEngine) engine; - return internalEngine.hasAcquiredIndexCommits(); + return internalEngine.hasAcquiredIndexCommitsForTesting(); } public static final class PrimaryTermSupplier implements LongSupplier { diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 77762544c4718..823f5084f5d59 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -1369,7 +1369,7 @@ private void assertNoAcquiredIndexCommit() throws Exception { if (engine instanceof InternalEngine) { assertFalse( indexShard.routingEntry().toString() + " has unreleased snapshotted index commits", - EngineTestCase.hasAcquiredIndexCommits(engine) + EngineTestCase.hasAcquiredIndexCommitsForTesting(engine) ); } } catch (AlreadyClosedException ignored) { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java index 99344f22bae31..f577ccd4e5a44 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java @@ -215,9 +215,9 @@ public void testGetSessionDoesNotLeakFileIfClosed() throws IOException { sessionReader.readFileBytes(files.get(1).name(), MockBigArrays.NON_RECYCLING_INSTANCE.newByteArray(10, false)); } - assertTrue(EngineTestCase.hasAcquiredIndexCommits(IndexShardTestCase.getEngine(indexShard))); + assertTrue(EngineTestCase.hasAcquiredIndexCommitsForTesting(IndexShardTestCase.getEngine(indexShard))); restoreSourceService.closeSession(sessionUUID); - assertFalse(EngineTestCase.hasAcquiredIndexCommits(IndexShardTestCase.getEngine(indexShard))); + assertFalse(EngineTestCase.hasAcquiredIndexCommitsForTesting(IndexShardTestCase.getEngine(indexShard))); closeShards(indexShard); // Exception will be thrown if file is not closed.