Skip to content

Commit

Permalink
Do not advance the range we're processing for durability scheduling u…
Browse files Browse the repository at this point in the history
…ntil success; clear Recover.recoverOks to free up memory once quorum reached; introduce Cancellable
  • Loading branch information
belliottsmith committed Oct 13, 2024
1 parent 35704ba commit 70e61b4
Show file tree
Hide file tree
Showing 18 changed files with 375 additions and 249 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ public void persist(Node node, Topologies any, FullRoute<?> route, Participants<
Topologies all = forExecution(node, route, txnId, executeAt, deps);

invokeSuccess(node, route, txnId, txn, deps, callback);
new PersistExclusiveSyncPoint(node, all, txnId, route, txn, executeAt, deps, writes, result)
new PersistSyncPoint(node, all, txnId, route, txn, executeAt, deps, writes, result)
.start(Apply.FACTORY, Maximal, any, writes, result);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@
import accord.topology.Topologies;
import accord.utils.SortedArrays;

public class PersistExclusiveSyncPoint extends Persist
public class PersistSyncPoint extends Persist
{
public PersistExclusiveSyncPoint(Node node, Topologies topologies, TxnId txnId, FullRoute<?> route, Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result result)
public PersistSyncPoint(Node node, Topologies topologies, TxnId txnId, FullRoute<?> route, Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result result)
{
super(node, topologies, txnId, route, txn, executeAt, deps, writes, result);
}
Expand Down
36 changes: 21 additions & 15 deletions accord-core/src/main/java/accord/coordinate/Recover.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ AsyncResult<Object> awaitCommits(Node node, Deps waitOn)
private final BiConsumer<Outcome, Throwable> callback;
private boolean isDone;

private final SortedListMap<Id, RecoverOk> recoverOks;
private SortedListMap<Id, RecoverOk> recoverOks;
private final RecoveryTracker tracker;
private boolean isBallotPromised;

Expand Down Expand Up @@ -199,6 +199,8 @@ private void recover()
Invariants.checkState(!isBallotPromised);
isBallotPromised = true;

SortedListMap<Id, RecoverOk> recoverOks = this.recoverOks;
if (!Invariants.debug()) this.recoverOks = null;
List<RecoverOk> recoverOkList = recoverOks.valuesAsNullableList();
RecoverOk acceptOrCommit = maxAccepted(recoverOkList);
RecoverOk acceptOrCommitNotTruncated = acceptOrCommit == null || acceptOrCommit.status != Status.Truncated
Expand All @@ -213,14 +215,14 @@ private void recover()
case Truncated: throw illegalState("Truncate should be filtered");
case Invalidated:
{
commitInvalidate();
commitInvalidate(invalidateUntil(recoverOks));
return;
}

case Applied:
case PreApplied:
{
withCommittedDeps(executeAt, (stableDeps, withEpochFailure) -> {
withCommittedDeps(recoverOkList, executeAt, (stableDeps, withEpochFailure) -> {
if (withEpochFailure != null)
{
node.agent().onUncaughtException(CoordinationFailed.wrap(withEpochFailure));
Expand All @@ -242,7 +244,7 @@ private void recover()
{
// TODO (required): if we have to calculate deps for any shard, should we first ensure they are stable?
// it should only be possible for a fast path decision, but we might have Stable in only one shard.
withCommittedDeps(executeAt, (stableDeps, withEpochFailure) -> {
withCommittedDeps(recoverOkList, executeAt, (stableDeps, withEpochFailure) -> {
if (withEpochFailure != null)
{
node.agent().onUncaughtException(CoordinationFailed.wrap(withEpochFailure));
Expand All @@ -257,7 +259,7 @@ private void recover()
case Committed:
{
// TODO (expected): should we only ask for txns with t0 < t0' < t?
withCommittedDeps(executeAt, (committedDeps, withEpochFailure) -> {
withCommittedDeps(recoverOkList, executeAt, (committedDeps, withEpochFailure) -> {
if (withEpochFailure != null)
{
node.agent().onUncaughtException(CoordinationFailed.wrap(withEpochFailure));
Expand All @@ -280,7 +282,7 @@ private void recover()

case AcceptedInvalidate:
{
invalidate();
invalidate(recoverOks);
return;
}

Expand Down Expand Up @@ -321,7 +323,7 @@ private void recover()

if (tracker.rejectsFastPath() || recoverOks.valuesAsNullableStream().anyMatch(ok -> ok != null && ok.rejectsFastPath))
{
invalidate();
invalidate(recoverOks);
return;
}

Expand All @@ -348,9 +350,9 @@ private void recover()
propose(txnId, proposeDeps);
}

private void withCommittedDeps(Timestamp executeAt, BiConsumer<Deps, Throwable> withDeps)
private void withCommittedDeps(List<RecoverOk> nullableRecoverOkList, Timestamp executeAt, BiConsumer<Deps, Throwable> withDeps)
{
LatestDeps.MergedCommitResult merged = LatestDeps.mergeCommit(txnId, executeAt, recoverOks.valuesAsNullableList(), ok -> ok == null ? null : ok.deps);
LatestDeps.MergedCommitResult merged = LatestDeps.mergeCommit(txnId, executeAt, nullableRecoverOkList, ok -> ok == null ? null : ok.deps);
node.withEpoch(executeAt.epoch(), (ignored, withEpochFailure) -> {
if (withEpochFailure != null)
{
Expand Down Expand Up @@ -379,22 +381,26 @@ private void withCommittedDeps(Timestamp executeAt, BiConsumer<Deps, Throwable>
});
}

private void invalidate()
private void invalidate(SortedListMap<Id, RecoverOk> recoverOks)
{
Timestamp invalidateUntil = invalidateUntil(recoverOks);
proposeInvalidate(node, ballot, txnId, route.homeKey(), (success, fail) -> {
if (fail != null) accept(null, fail);
else commitInvalidate();
else commitInvalidate(invalidateUntil);
});
}

private void commitInvalidate()
private Timestamp invalidateUntil(SortedListMap<Id, RecoverOk> recoverOks)
{
// If not accepted then the executeAt is not consistent cross the peers and likely different on every node. There is also an edge case
// when ranges are removed from the topology, during this case the executeAt won't know the ranges and the invalidate commit will fail.
Timestamp invalidateUntil = recoverOks.valuesAsNullableStream()
.map(ok -> ok == null ? null : ok.status.hasBeen(Status.Accepted) ? ok.executeAt : ok.txnId)
.reduce(txnId, Timestamp::nonNullOrMax);
return recoverOks.valuesAsNullableStream()
.map(ok -> ok == null ? null : ok.status.hasBeen(Status.Accepted) ? ok.executeAt : ok.txnId)
.reduce(txnId, Timestamp::nonNullOrMax);
}

private void commitInvalidate(Timestamp invalidateUntil)
{
node.withEpoch(invalidateUntil.epoch(), (ignored, withEpochFailure) -> {
if (withEpochFailure != null)
{
Expand Down
Loading

0 comments on commit 70e61b4

Please sign in to comment.