Skip to content

Commit

Permalink
[test] Track index commits internally acquired by the commits listene…
Browse files Browse the repository at this point in the history
…r in CombinedDeletionPolicy (elastic#112507)

After finishing an integration test, we run some checks against the test
cluster, among others we assert there are no leaky acquired index
commits left in `InternalTestCluster#beforeIndexDeletion`. The issue is
that while we check the test cluster before we shut it down, we can't
guarantee that there wouldn't be a new commit triggered by a background
merge which will acquire an index commit. But we actually don't care
about these commits acquired internally as part of
`CombinedDeletionPolicy#commitsListener` callbacks. We just want to make
sure that all index commits that have been acquired explicitly are also
released.  So, we make an explicit distinction between external and
internal index commits that are tracked in  `CombinedDeletionPolicy`.

ES-8407

<!-- Please don't remove the machine data below, even if you remove the
linking label --> <!--es-delivery-machine-data
{"linkedServerlessPr":2694,"linkedStatefulPr":112507}
es-delivery-machine-data-->
  • Loading branch information
arteam authored Sep 10, 2024
1 parent 9081a95 commit 8c088d5
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ public class CombinedDeletionPolicy extends IndexDeletionPolicy {
private final SoftDeletesPolicy softDeletesPolicy;
private final LongSupplier globalCheckpointSupplier;
private final Map<IndexCommit, Integer> acquiredIndexCommits; // Number of references held against each commit point.
// Index commits internally acquired by the commits listener. We want to track them separately to be able to disregard them
// when checking for externally acquired index commits that haven't been released
private final Set<IndexCommit> internallyAcquiredIndexCommits;

interface CommitsListener {

Expand Down Expand Up @@ -72,6 +75,7 @@ interface CommitsListener {
this.globalCheckpointSupplier = globalCheckpointSupplier;
this.commitsListener = commitsListener;
this.acquiredIndexCommits = new HashMap<>();
this.internallyAcquiredIndexCommits = new HashSet<>();
}

@Override
Expand Down Expand Up @@ -114,7 +118,7 @@ public void onCommit(List<? extends IndexCommit> commits) throws IOException {
this.maxSeqNoOfNextSafeCommit = Long.parseLong(commits.get(keptPosition + 1).getUserData().get(SequenceNumbers.MAX_SEQ_NO));
}
if (commitsListener != null && previousLastCommit != this.lastCommit) {
newCommit = acquireIndexCommit(false);
newCommit = acquireIndexCommit(false, true);
} else {
newCommit = null;
}
Expand Down Expand Up @@ -210,15 +214,25 @@ SafeCommitInfo getSafeCommitInfo() {
* @param acquiringSafeCommit captures the most recent safe commit point if true; otherwise captures the most recent commit point.
*/
synchronized IndexCommit acquireIndexCommit(boolean acquiringSafeCommit) {
return acquireIndexCommit(acquiringSafeCommit, false);
}

private synchronized IndexCommit acquireIndexCommit(boolean acquiringSafeCommit, boolean acquiredInternally) {
assert safeCommit != null : "Safe commit is not initialized yet";
assert lastCommit != null : "Last commit is not initialized yet";
final IndexCommit snapshotting = acquiringSafeCommit ? safeCommit : lastCommit;
acquiredIndexCommits.merge(snapshotting, 1, Integer::sum); // increase refCount
return wrapCommit(snapshotting);
assert acquiredInternally == false || internallyAcquiredIndexCommits.add(snapshotting)
: "commit [" + snapshotting + "] already added";
return wrapCommit(snapshotting, acquiredInternally);
}

protected IndexCommit wrapCommit(IndexCommit indexCommit) {
return new SnapshotIndexCommit(indexCommit);
return wrapCommit(indexCommit, false);
}

protected IndexCommit wrapCommit(IndexCommit indexCommit, boolean acquiredInternally) {
return new SnapshotIndexCommit(indexCommit, acquiredInternally);
}

/**
Expand All @@ -227,7 +241,8 @@ protected IndexCommit wrapCommit(IndexCommit indexCommit) {
* @return true if the acquired commit can be clean up.
*/
synchronized boolean releaseCommit(final IndexCommit acquiredCommit) {
final IndexCommit releasingCommit = ((SnapshotIndexCommit) acquiredCommit).getIndexCommit();
final SnapshotIndexCommit snapshotIndexCommit = (SnapshotIndexCommit) acquiredCommit;
final IndexCommit releasingCommit = snapshotIndexCommit.getIndexCommit();
assert acquiredIndexCommits.containsKey(releasingCommit)
: "Release non-acquired commit;"
+ "acquired commits ["
Expand All @@ -242,6 +257,8 @@ synchronized boolean releaseCommit(final IndexCommit acquiredCommit) {
}
return count - 1;
});
assert snapshotIndexCommit.acquiredInternally == false || internallyAcquiredIndexCommits.remove(releasingCommit)
: "Trying to release a commit [" + releasingCommit + "] that hasn't been previously acquired internally";

assert refCount == null || refCount > 0 : "Number of references for acquired commit can not be negative [" + refCount + "]";
// The commit can be clean up only if no refCount and it is neither the safe commit nor last commit.
Expand Down Expand Up @@ -296,10 +313,16 @@ private static Set<String> listOfNewFileNames(IndexCommit previous, IndexCommit
}

/**
* Checks whether the deletion policy is holding on to acquired index commits
* Checks whether the deletion policy is holding on to externally acquired index commits
*/
synchronized boolean hasAcquiredIndexCommits() {
return acquiredIndexCommits.isEmpty() == false;
synchronized boolean hasAcquiredIndexCommitsForTesting() {
// We explicitly check only external commits and disregard internal commits acquired by the commits listener
for (var e : acquiredIndexCommits.entrySet()) {
if (internallyAcquiredIndexCommits.contains(e.getKey()) == false || e.getValue() > 1) {
return true;
}
}
return false;
}

/**
Expand All @@ -320,8 +343,12 @@ public static String commitDescription(IndexCommit commit) throws IOException {
* A wrapper of an index commit that prevents it from being deleted.
*/
private static class SnapshotIndexCommit extends FilterIndexCommit {
SnapshotIndexCommit(IndexCommit delegate) {

private final boolean acquiredInternally;

SnapshotIndexCommit(IndexCommit delegate, boolean acquiredInternally) {
super(delegate);
this.acquiredInternally = acquiredInternally;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -669,8 +669,8 @@ Translog getTranslog() {
}

// Package private for testing purposes only
boolean hasAcquiredIndexCommits() {
return combinedDeletionPolicy.hasAcquiredIndexCommits();
boolean hasAcquiredIndexCommitsForTesting() {
return combinedDeletionPolicy.hasAcquiredIndexCommitsForTesting();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1440,10 +1440,10 @@ public static void waitForOpsToComplete(InternalEngine engine, long seqNo) throw
assertBusy(() -> assertThat(engine.getLocalCheckpointTracker().getProcessedCheckpoint(), greaterThanOrEqualTo(seqNo)));
}

public static boolean hasAcquiredIndexCommits(Engine engine) {
public static boolean hasAcquiredIndexCommitsForTesting(Engine engine) {
assert engine instanceof InternalEngine : "only InternalEngines have snapshotted commits, got: " + engine.getClass();
InternalEngine internalEngine = (InternalEngine) engine;
return internalEngine.hasAcquiredIndexCommits();
return internalEngine.hasAcquiredIndexCommitsForTesting();
}

public static final class PrimaryTermSupplier implements LongSupplier {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1369,7 +1369,7 @@ private void assertNoAcquiredIndexCommit() throws Exception {
if (engine instanceof InternalEngine) {
assertFalse(
indexShard.routingEntry().toString() + " has unreleased snapshotted index commits",
EngineTestCase.hasAcquiredIndexCommits(engine)
EngineTestCase.hasAcquiredIndexCommitsForTesting(engine)
);
}
} catch (AlreadyClosedException ignored) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,9 @@ public void testGetSessionDoesNotLeakFileIfClosed() throws IOException {
sessionReader.readFileBytes(files.get(1).name(), MockBigArrays.NON_RECYCLING_INSTANCE.newByteArray(10, false));
}

assertTrue(EngineTestCase.hasAcquiredIndexCommits(IndexShardTestCase.getEngine(indexShard)));
assertTrue(EngineTestCase.hasAcquiredIndexCommitsForTesting(IndexShardTestCase.getEngine(indexShard)));
restoreSourceService.closeSession(sessionUUID);
assertFalse(EngineTestCase.hasAcquiredIndexCommits(IndexShardTestCase.getEngine(indexShard)));
assertFalse(EngineTestCase.hasAcquiredIndexCommitsForTesting(IndexShardTestCase.getEngine(indexShard)));

closeShards(indexShard);
// Exception will be thrown if file is not closed.
Expand Down

0 comments on commit 8c088d5

Please sign in to comment.