From 70e61b434182f65fc966b592ec551aabc963c0dc Mon Sep 17 00:00:00 2001 From: Benedict Elliott Smith Date: Sun, 13 Oct 2024 12:13:28 +0100 Subject: [PATCH] Do not advance the range we're processing for durability scheduling until success; clear Recover.recoverOks to free up memory once quorum reached; introduce Cancellable --- .../coordinate/CoordinationAdapter.java | 2 +- ...veSyncPoint.java => PersistSyncPoint.java} | 4 +- .../main/java/accord/coordinate/Recover.java | 36 ++- .../impl/CoordinateDurabilityScheduling.java | 305 ++++++++++-------- .../accord/impl/InMemoryCommandStore.java | 12 +- .../main/java/accord/local/CommandStore.java | 42 ++- .../main/java/accord/local/CommandStores.java | 4 +- .../java/accord/messages/BeginRecovery.java | 1 - .../java/accord/primitives/LatestDeps.java | 5 +- .../java/accord/utils/async/AsyncChain.java | 4 +- .../utils/async/AsyncChainCombiner.java | 14 +- .../java/accord/utils/async/AsyncChains.java | 139 ++++---- .../java/accord/utils/async/AsyncResult.java | 5 +- .../java/accord/utils/async/AsyncResults.java | 8 +- .../java/accord/utils/async/Cancellable.java | 24 ++ .../java/accord/utils/async/Observable.java | 3 +- .../impl/basic/DelayedCommandStores.java | 4 +- .../accord/utils/async/AsyncChainsTest.java | 12 +- 18 files changed, 375 insertions(+), 249 deletions(-) rename accord-core/src/main/java/accord/coordinate/{PersistExclusiveSyncPoint.java => PersistSyncPoint.java} (89%) create mode 100644 accord-core/src/main/java/accord/utils/async/Cancellable.java diff --git a/accord-core/src/main/java/accord/coordinate/CoordinationAdapter.java b/accord-core/src/main/java/accord/coordinate/CoordinationAdapter.java index 2e739ed23..3c52d0a21 100644 --- a/accord-core/src/main/java/accord/coordinate/CoordinationAdapter.java +++ b/accord-core/src/main/java/accord/coordinate/CoordinationAdapter.java @@ -229,7 +229,7 @@ public void persist(Node node, Topologies any, FullRoute route, Participants< Topologies all = forExecution(node, route, txnId, executeAt, deps); invokeSuccess(node, route, txnId, txn, deps, callback); - new PersistExclusiveSyncPoint(node, all, txnId, route, txn, executeAt, deps, writes, result) + new PersistSyncPoint(node, all, txnId, route, txn, executeAt, deps, writes, result) .start(Apply.FACTORY, Maximal, any, writes, result); } } diff --git a/accord-core/src/main/java/accord/coordinate/PersistExclusiveSyncPoint.java b/accord-core/src/main/java/accord/coordinate/PersistSyncPoint.java similarity index 89% rename from accord-core/src/main/java/accord/coordinate/PersistExclusiveSyncPoint.java rename to accord-core/src/main/java/accord/coordinate/PersistSyncPoint.java index cc6284289..f0be6f96e 100644 --- a/accord-core/src/main/java/accord/coordinate/PersistExclusiveSyncPoint.java +++ b/accord-core/src/main/java/accord/coordinate/PersistSyncPoint.java @@ -30,9 +30,9 @@ import accord.topology.Topologies; import accord.utils.SortedArrays; -public class PersistExclusiveSyncPoint extends Persist +public class PersistSyncPoint extends Persist { - public PersistExclusiveSyncPoint(Node node, Topologies topologies, TxnId txnId, FullRoute route, Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result result) + public PersistSyncPoint(Node node, Topologies topologies, TxnId txnId, FullRoute route, Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result result) { super(node, topologies, txnId, route, txn, executeAt, deps, writes, result); } diff --git a/accord-core/src/main/java/accord/coordinate/Recover.java b/accord-core/src/main/java/accord/coordinate/Recover.java index 5f89c835e..09ec89e50 100644 --- a/accord-core/src/main/java/accord/coordinate/Recover.java +++ b/accord-core/src/main/java/accord/coordinate/Recover.java @@ -101,7 +101,7 @@ AsyncResult awaitCommits(Node node, Deps waitOn) private final BiConsumer callback; private boolean isDone; - private final SortedListMap recoverOks; + private SortedListMap recoverOks; private final RecoveryTracker tracker; private boolean isBallotPromised; @@ -199,6 +199,8 @@ private void recover() Invariants.checkState(!isBallotPromised); isBallotPromised = true; + SortedListMap recoverOks = this.recoverOks; + if (!Invariants.debug()) this.recoverOks = null; List recoverOkList = recoverOks.valuesAsNullableList(); RecoverOk acceptOrCommit = maxAccepted(recoverOkList); RecoverOk acceptOrCommitNotTruncated = acceptOrCommit == null || acceptOrCommit.status != Status.Truncated @@ -213,14 +215,14 @@ private void recover() case Truncated: throw illegalState("Truncate should be filtered"); case Invalidated: { - commitInvalidate(); + commitInvalidate(invalidateUntil(recoverOks)); return; } case Applied: case PreApplied: { - withCommittedDeps(executeAt, (stableDeps, withEpochFailure) -> { + withCommittedDeps(recoverOkList, executeAt, (stableDeps, withEpochFailure) -> { if (withEpochFailure != null) { node.agent().onUncaughtException(CoordinationFailed.wrap(withEpochFailure)); @@ -242,7 +244,7 @@ private void recover() { // TODO (required): if we have to calculate deps for any shard, should we first ensure they are stable? // it should only be possible for a fast path decision, but we might have Stable in only one shard. - withCommittedDeps(executeAt, (stableDeps, withEpochFailure) -> { + withCommittedDeps(recoverOkList, executeAt, (stableDeps, withEpochFailure) -> { if (withEpochFailure != null) { node.agent().onUncaughtException(CoordinationFailed.wrap(withEpochFailure)); @@ -257,7 +259,7 @@ private void recover() case Committed: { // TODO (expected): should we only ask for txns with t0 < t0' < t? - withCommittedDeps(executeAt, (committedDeps, withEpochFailure) -> { + withCommittedDeps(recoverOkList, executeAt, (committedDeps, withEpochFailure) -> { if (withEpochFailure != null) { node.agent().onUncaughtException(CoordinationFailed.wrap(withEpochFailure)); @@ -280,7 +282,7 @@ private void recover() case AcceptedInvalidate: { - invalidate(); + invalidate(recoverOks); return; } @@ -321,7 +323,7 @@ private void recover() if (tracker.rejectsFastPath() || recoverOks.valuesAsNullableStream().anyMatch(ok -> ok != null && ok.rejectsFastPath)) { - invalidate(); + invalidate(recoverOks); return; } @@ -348,9 +350,9 @@ private void recover() propose(txnId, proposeDeps); } - private void withCommittedDeps(Timestamp executeAt, BiConsumer withDeps) + private void withCommittedDeps(List nullableRecoverOkList, Timestamp executeAt, BiConsumer withDeps) { - LatestDeps.MergedCommitResult merged = LatestDeps.mergeCommit(txnId, executeAt, recoverOks.valuesAsNullableList(), ok -> ok == null ? null : ok.deps); + LatestDeps.MergedCommitResult merged = LatestDeps.mergeCommit(txnId, executeAt, nullableRecoverOkList, ok -> ok == null ? null : ok.deps); node.withEpoch(executeAt.epoch(), (ignored, withEpochFailure) -> { if (withEpochFailure != null) { @@ -379,22 +381,26 @@ private void withCommittedDeps(Timestamp executeAt, BiConsumer }); } - private void invalidate() + private void invalidate(SortedListMap recoverOks) { + Timestamp invalidateUntil = invalidateUntil(recoverOks); proposeInvalidate(node, ballot, txnId, route.homeKey(), (success, fail) -> { if (fail != null) accept(null, fail); - else commitInvalidate(); + else commitInvalidate(invalidateUntil); }); } - private void commitInvalidate() + private Timestamp invalidateUntil(SortedListMap recoverOks) { // If not accepted then the executeAt is not consistent cross the peers and likely different on every node. There is also an edge case // when ranges are removed from the topology, during this case the executeAt won't know the ranges and the invalidate commit will fail. - Timestamp invalidateUntil = recoverOks.valuesAsNullableStream() - .map(ok -> ok == null ? null : ok.status.hasBeen(Status.Accepted) ? ok.executeAt : ok.txnId) - .reduce(txnId, Timestamp::nonNullOrMax); + return recoverOks.valuesAsNullableStream() + .map(ok -> ok == null ? null : ok.status.hasBeen(Status.Accepted) ? ok.executeAt : ok.txnId) + .reduce(txnId, Timestamp::nonNullOrMax); + } + private void commitInvalidate(Timestamp invalidateUntil) + { node.withEpoch(invalidateUntil.epoch(), (ignored, withEpochFailure) -> { if (withEpochFailure != null) { diff --git a/accord-core/src/main/java/accord/impl/CoordinateDurabilityScheduling.java b/accord-core/src/main/java/accord/impl/CoordinateDurabilityScheduling.java index bb6de074d..fd6270898 100644 --- a/accord-core/src/main/java/accord/impl/CoordinateDurabilityScheduling.java +++ b/accord-core/src/main/java/accord/impl/CoordinateDurabilityScheduling.java @@ -23,11 +23,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; import com.google.common.primitives.Ints; import org.slf4j.Logger; @@ -87,32 +83,30 @@ public class CoordinateDurabilityScheduling private final Node node; private Scheduler.Scheduled scheduled; - private final AtomicReference active = new AtomicReference<>(Ranges.EMPTY); - private final ConcurrentHashMap coordinating = new ConcurrentHashMap<>(); /* * In each round at each node wait this amount of time between initiating new CoordinateShardDurable */ - private int frequencyMicros = Ints.saturatedCast(TimeUnit.MILLISECONDS.toMicros(500L)); + private long frequencyMicros = TimeUnit.MILLISECONDS.toMicros(500L); /* * In each round at each node wait this amount of time between allocating a CoordinateShardDurable txnId * and coordinating the shard durability */ - private int txnIdLagMicros = Ints.saturatedCast(TimeUnit.SECONDS.toMicros(1L)); + private long txnIdLagMicros = TimeUnit.SECONDS.toMicros(1L); /* * In each round at each node wait this amount of time between allocating a CoordinateShardDurable txnId * and coordinating the shard durability */ - private int durabilityLagMicros = Ints.saturatedCast(TimeUnit.MILLISECONDS.toMicros(500L)); + private long durabilityLagMicros = TimeUnit.MILLISECONDS.toMicros(500L); /* * Target for how often the entire ring should be processed in microseconds. Every node will start at an offset in the current round that is based * on this value / by (total # replicas * its index in the current round). The current round is determined by dividing time since the epoch by this * duration. */ - private int shardCycleTimeMicros = Ints.saturatedCast(TimeUnit.SECONDS.toMicros(30)); + private long shardCycleTimeMicros = TimeUnit.SECONDS.toMicros(30); /* * Every node will independently attempt to invoke CoordinateGloballyDurable @@ -127,13 +121,166 @@ public class CoordinateDurabilityScheduling private long globalCycleTimeMicros = TimeUnit.SECONDS.toMicros(30); private Topology currentGlobalTopology; - private final Map localIndexes = new HashMap<>(); + private final Map shardSchedulers = new HashMap<>(); private int globalIndex; private long nextGlobalSyncTimeMicros; - private long prevShardSyncTimeMicros; volatile boolean stop; + private class ShardScheduler + { + Shard shard; + + // based on ideal number of splits + int nodeOffset; + + int index; + int numberOfSplits, desiredNumberOfSplits; + boolean defunct; + long shardCycleTimeMicros; + Scheduler.Scheduled scheduled; + long rangeStartedAtMicros; + long cycleStartedAtMicros = -1; + + private ShardScheduler() + { + } + + synchronized void update(Shard shard, int offset) + { + this.shard = shard; + this.nodeOffset = offset; + this.shardCycleTimeMicros = Math.max(CoordinateDurabilityScheduling.this.shardCycleTimeMicros, shard.rf() * 3L * frequencyMicros); + this.desiredNumberOfSplits = (int) ((shardCycleTimeMicros + frequencyMicros - 1) / frequencyMicros); + if (numberOfSplits == 0 || numberOfSplits < desiredNumberOfSplits) + { + index = offset; + numberOfSplits = desiredNumberOfSplits; + } + } + + synchronized void markDefunct() + { + defunct = true; + } + + synchronized void schedule() + { + if (defunct) + return; + + long nowMicros = node.elapsed(MICROSECONDS); + int cyclePosition = (nodeOffset + (((index * shard.rf()) + numberOfSplits - 1) / numberOfSplits)) % shard.rf(); + long microsOffset = (cyclePosition * shardCycleTimeMicros) / shard.rf(); + long scheduleAt = nowMicros - (nowMicros % shardCycleTimeMicros) + microsOffset; + if (nowMicros > scheduleAt) + scheduleAt += shardCycleTimeMicros; + + ShardDistributor distributor = node.commandStores().shardDistributor(); + Range range; + int nextIndex; + { + int i = index; + Range selectRange = null; + while (selectRange == null) + selectRange = distributor.splitRange(shard.range, index, ++i, numberOfSplits); + range = selectRange; + nextIndex = i; + } + + scheduled = node.scheduler().once(() -> { + // TODO (required): allocate stale HLC from a reservation of HLCs for this purpose + TxnId syncId = node.nextTxnId(ExclusiveSyncPoint, Domain.Range); + startShardSync(syncId, Ranges.of(range), nextIndex); + }, scheduleAt - nowMicros, MICROSECONDS); + } + + /** + * The first step for coordinating shard durable is to run an exclusive sync point + * the result of which can then be used to run + */ + private void startShardSync(TxnId syncId, Ranges ranges, int nextIndex) + { + scheduled = node.scheduler().once(() -> node.withEpoch(syncId.epoch(), (ignored, withEpochFailure) -> { + if (withEpochFailure != null) + { + startShardSync(syncId, ranges, nextIndex); + Throwable wrapped = CoordinationFailed.wrap(withEpochFailure); + logger.trace("Exception waiting for epoch before coordinating exclusive sync point for local shard durability, epoch " + syncId.epoch(), wrapped); + node.agent().onUncaughtException(wrapped); + return; + } + scheduled = null; + rangeStartedAtMicros = node.elapsed(MICROSECONDS); + FullRoute route = (FullRoute) node.computeRoute(syncId, ranges); + exclusiveSyncPoint(node, syncId, route) + .addCallback((success, fail) -> { + if (fail != null) + { + synchronized (ShardScheduler.this) + { + index *= 2; + numberOfSplits *= 2; + // TODO (required): try to recover or invalidate prior sync point + schedule(); + logger.warn("{}: Exception coordinating ExclusiveSyncPoint for {} durability. Increased numberOfSplits to " + numberOfSplits, syncId, ranges, fail); + } + } + else + { // TODO (required): decouple CoordinateShardDurable concurrency from CoordinateSyncPoint (i.e., permit at least one CoordinateSyncPoint to queue up while we're coordinating durability) + coordinateShardDurableAfterExclusiveSyncPoint(node, success, nextIndex); + logger.trace("{}: Successfully coordinated ExclusiveSyncPoint for local shard durability of {}", syncId, ranges); + } + }); + }), txnIdLagMicros, MICROSECONDS); + } + + private void coordinateShardDurableAfterExclusiveSyncPoint(Node node, SyncPoint exclusiveSyncPoint, int nextIndex) + { + scheduled = node.scheduler().once(() -> { + scheduled = null; + node.commandStores().any().execute(() -> { + CoordinateShardDurable.coordinate(node, exclusiveSyncPoint) + .addCallback((success, fail) -> { + if (fail != null && fail.getClass() != SyncPointErased.class) + { + logger.trace("Exception coordinating local shard durability, will retry immediately", fail); + coordinateShardDurableAfterExclusiveSyncPoint(node, exclusiveSyncPoint, nextIndex); + } + else + { + synchronized (ShardScheduler.this) + { + index = nextIndex; + if (index >= numberOfSplits) + { + index = 0; + long nowMicros = node.elapsed(MICROSECONDS); + String reportTime = ""; + if (cycleStartedAtMicros > 0) + reportTime = "in " + MICROSECONDS.toSeconds(nowMicros - cycleStartedAtMicros) + 's'; + logger.info("Successfully completed one cycle of durability scheduling for shard {}{}", shard.range, reportTime); + if (numberOfSplits > desiredNumberOfSplits) + numberOfSplits = Math.max(desiredNumberOfSplits, (int)(numberOfSplits * 0.9)); + cycleStartedAtMicros = nowMicros; + } + else + { + long nowMicros = node.elapsed(MICROSECONDS); + logger.debug("Successfully coordinated shard durability for range {} in {}s", shard.range, MICROSECONDS.toSeconds(nowMicros - rangeStartedAtMicros)); + } + + schedule(); + } + } + }); + }); + }, durabilityLagMicros, MICROSECONDS); + } + + } + + public CoordinateDurabilityScheduling(Node node) { this.node = node; @@ -171,7 +318,6 @@ public synchronized void start() { Invariants.checkState(!stop); // cannot currently restart safely long nowMicros = node.elapsed(MICROSECONDS); - prevShardSyncTimeMicros = nowMicros; setNextGlobalSyncTime(nowMicros); scheduled = node.scheduler().recurring(this::run, frequencyMicros, MICROSECONDS); } @@ -184,106 +330,27 @@ public void stop() } /** - * Schedule the first CoordinateShardDurable execution for the current round. Sub-steps will be scheduled after - * each sub-step completes, and once all are completed scheduleCoordinateShardDurable is called again. + * Update our topology information, and schedule any global syncs that may be pending. */ private void run() { if (stop) return; + // TODO (expected): invoke this as soon as topology is updated in topology manager updateTopology(); if (currentGlobalTopology == null || currentGlobalTopology.size() == 0) return; + // TODO (expected): schedule this directly based on the global sync frequency - this is an artefact of previously scheduling shard syncs as well long nowMicros = node.elapsed(MICROSECONDS); if (nextGlobalSyncTimeMicros <= nowMicros) { startGlobalSync(); setNextGlobalSyncTime(nowMicros); } - - List coordinate = rangesToShardSync(nowMicros); - prevShardSyncTimeMicros = nowMicros; - if (coordinate.isEmpty()) - { - logger.trace("Nothing pending in schedule for time slot at {}", nowMicros); - return; - } - - logger.trace("Scheduling CoordinateShardDurable for {} at {}", coordinate, nowMicros); - for (Ranges ranges : coordinate) - { - if (ranges.isEmpty()) - continue; - - Ranges inactiveRanges = ranges.without(active.get()); - if (!inactiveRanges.equals(ranges)) - { - String waitingOn = coordinating.entrySet().stream().filter(e -> e.getValue().intersects(ranges)).map(Objects::toString).collect(Collectors.joining(", ", "[", "]")); - logger.info("Not initiating new durability scheduling for {} as previous attempt(s) {} still in progress (scheduling {})", ranges.without(inactiveRanges), waitingOn, inactiveRanges); - if (inactiveRanges.isEmpty()) - continue; - } - active.accumulateAndGet(inactiveRanges, Ranges::with); - startShardSync(inactiveRanges); - } } - /** - * The first step for coordinating shard durable is to run an exclusive sync point - * the result of which can then be used to run - */ - private void startShardSync(Ranges ranges) - { - TxnId at = node.nextTxnId(ExclusiveSyncPoint, Domain.Range); - coordinating.put(at, ranges); - node.scheduler().once(() -> node.withEpoch(at.epoch(), (ignored, withEpochFailure) -> { - FullRoute route = (FullRoute) node.computeRoute(at, ranges); - if (withEpochFailure != null) - { - Throwable wrapped = CoordinationFailed.wrap(withEpochFailure); - logger.trace("Exception waiting for epoch before coordinating exclusive sync point for local shard durability, epoch " + at.epoch(), wrapped); - node.agent().onUncaughtException(wrapped); - return; - } - exclusiveSyncPoint(node, at, route) - .addCallback((success, fail) -> { - if (fail != null) - { - logger.trace("{}: Exception coordinating ExclusiveSyncPoint for local shard durability of {}", at, ranges, fail); - coordinating.remove(at); - active.accumulateAndGet(route.toRanges(), Ranges::without); - } - else - { - coordinateShardDurableAfterExclusiveSyncPoint(node, success); - logger.trace("{}: Successfully coordinated ExclusiveSyncPoint for local shard durability of {}", at, ranges); - } - }); - }), txnIdLagMicros, MICROSECONDS); - } - - private void coordinateShardDurableAfterExclusiveSyncPoint(Node node, SyncPoint exclusiveSyncPoint) - { - node.scheduler().once(() -> { - node.commandStores().any().execute(() -> { - CoordinateShardDurable.coordinate(node, exclusiveSyncPoint) - .addCallback((success, fail) -> { - if (fail != null && fail.getClass() != SyncPointErased.class) - { - logger.trace("Exception coordinating local shard durability, will retry immediately", fail); - coordinateShardDurableAfterExclusiveSyncPoint(node, exclusiveSyncPoint); - } - else - { - coordinating.remove(exclusiveSyncPoint.syncId); - active.accumulateAndGet(exclusiveSyncPoint.route.toRanges(), Ranges::without); - } - }); - }); - }, durabilityLagMicros, MICROSECONDS); - } private void startGlobalSync() { @@ -302,39 +369,6 @@ private void startGlobalSync() } } - private List rangesToShardSync(long nowMicros) - { - ShardDistributor distributor = node.commandStores().shardDistributor(); - List result = new ArrayList<>(); - for (Map.Entry e : localIndexes.entrySet()) - { - Shard shard = e.getKey(); - int index = e.getValue(); - int shardCycleTimeMicros = Math.max(this.shardCycleTimeMicros, Ints.saturatedCast(shard.rf() * 3L * frequencyMicros)); - long microsOffset = (index * (long)shardCycleTimeMicros) / shard.rf(); - long prevSyncTimeMicros = Math.max(prevShardSyncTimeMicros, nowMicros - ((shardCycleTimeMicros / shard.rf()) / 2L)); - int from = (int) ((prevSyncTimeMicros + microsOffset) % shardCycleTimeMicros); - int to = (int) ((nowMicros + microsOffset) % shardCycleTimeMicros); - List ranges = new ArrayList<>(); - if (from > to) - { - Range range = distributor.splitRange(shard.range, to, shardCycleTimeMicros, shardCycleTimeMicros); - if (range != null) - ranges.add(range); - to = from; - from = 0; - } - - Range range = distributor.splitRange(shard.range, from, to, shardCycleTimeMicros); - if (range != null) - ranges.add(range); - - if (!ranges.isEmpty()) - result.add(Ranges.of(ranges.toArray(new Range[0]))); - } - return result; - } - private void updateTopology() { Topology latestGlobal = node.topology().current(); @@ -350,9 +384,20 @@ private void updateTopology() Collections.sort(ids); globalIndex = ids.indexOf(node.id()); - localIndexes.clear(); + Map prev = new HashMap<>(this.shardSchedulers); + this.shardSchedulers.clear(); for (Shard shard : latestLocal.shards()) - localIndexes.put(shard, shard.nodes.find(node.id())); + { + ShardScheduler prevScheduler = prev.remove(shard.range); + ShardScheduler scheduler = prevScheduler; + if (scheduler == null) + scheduler = new ShardScheduler(); + shardSchedulers.put(shard.range, scheduler); + scheduler.update(shard, shard.nodes.find(node.id())); + if (prevScheduler == null) + scheduler.schedule(); + } + prev.forEach((r, s) -> s.markDefunct()); } /** diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java index c8fab6b04..a013a5035 100644 --- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java +++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java @@ -90,6 +90,7 @@ import accord.utils.Invariants; import accord.utils.async.AsyncChain; import accord.utils.async.AsyncChains; +import accord.utils.async.Cancellable; import static accord.local.KeyHistory.COMMANDS; import static accord.local.SafeCommandStore.TestDep.ANY_DEPS; @@ -1060,12 +1061,13 @@ private synchronized void maybeRun() } } - private void enqueueAndRun(Runnable runnable) + private Cancellable enqueueAndRun(Runnable runnable) { boolean result = queue.add(runnable); if (!result) throw illegalState("could not add item to queue"); maybeRun(); + return () -> queue.remove(runnable); } @Override @@ -1086,9 +1088,9 @@ public AsyncChain submit(PreLoadContext context, Function() { @Override - protected void start(BiConsumer callback) + protected Cancellable start(BiConsumer callback) { - enqueueAndRun(() -> executeInContext(InMemoryCommandStore.Synchronized.this, context, function, callback)); + return enqueueAndRun(() -> executeInContext(InMemoryCommandStore.Synchronized.this, context, function, callback)); } }; } @@ -1099,9 +1101,9 @@ public AsyncChain submit(Callable task) return new AsyncChains.Head() { @Override - protected void start(BiConsumer callback) + protected Cancellable start(BiConsumer callback) { - enqueueAndRun(() -> { + return enqueueAndRun(() -> { try { callback.accept(task.call(), null); diff --git a/accord-core/src/main/java/accord/local/CommandStore.java b/accord-core/src/main/java/accord/local/CommandStore.java index 5f220ee6f..eda91d8cc 100644 --- a/accord-core/src/main/java/accord/local/CommandStore.java +++ b/accord-core/src/main/java/accord/local/CommandStore.java @@ -27,6 +27,7 @@ import accord.api.Agent; import accord.local.CommandStores.RangesForEpoch; +import accord.primitives.Range; import accord.primitives.Routables; import accord.primitives.Unseekables; import accord.utils.async.AsyncChain; @@ -44,6 +45,7 @@ import java.util.TreeMap; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Function; @@ -477,19 +479,38 @@ protected Supplier sync(Node node, Ranges ranges, long epoch) }; } - // TODO (required): replace with a simple wait on suitable exclusive sync point(s) + // TODO (required, correctness): replace with a simple wait on suitable exclusive sync point(s) private void fetchMajorityDeps(AsyncResults.SettableResult coordination, Node node, long epoch, Ranges ranges) + { + AtomicInteger index = new AtomicInteger(); + fetchMajorityDeps(coordination, node, epoch, ranges, index); + } + + // this is temporary until we rely on RX + static final int FETCH_SLICES = Invariants.debug() ? 1 : 64; + private void fetchMajorityDeps(AsyncResults.SettableResult coordination, Node node, long epoch, Ranges ranges, AtomicInteger index) { TxnId id = TxnId.fromValues(epoch - 1, 0, node.id()); Timestamp before = Timestamp.minForEpoch(epoch); - FullRoute route = node.computeRoute(id, ranges); - // TODO (required): we need to ensure anyone we receive a reply from proposes newer timestamps for anything we don't see + int rangeIndex = index.get() / FETCH_SLICES; + int subRangeIndex = index.get() % FETCH_SLICES; + int nextIndex; + Range nextRange; + { + int nextSubRangeIndex = subRangeIndex; + Range selectRange = null; + while (selectRange == null) + selectRange = node.commandStores().shardDistributor().splitRange(ranges.get(rangeIndex), subRangeIndex, ++nextSubRangeIndex, FETCH_SLICES); + nextRange = selectRange; + nextIndex = rangeIndex + nextSubRangeIndex; + } + FullRoute route = node.computeRoute(id, Ranges.of(nextRange)); + logger.debug("Fetching deps to sync epoch {} for range {}", epoch, nextRange); CollectCalculatedDeps.withCalculatedDeps(node, id, route, route, before, (deps, fail) -> { if (fail != null) { - logger.warn("Failed to fetch deps for syncing epoch {} for ranges {}", epoch, ranges, fail); - node.scheduler().once(() -> fetchMajorityDeps(coordination, node, epoch, ranges), 1L, TimeUnit.MINUTES); - node.agent().onUncaughtException(fail); + logger.warn("Failed to fetch deps for syncing epoch {} for range {}", epoch, nextRange, fail); + node.scheduler().once(() -> fetchMajorityDeps(coordination, node, epoch, ranges, index), 1L, TimeUnit.MINUTES); } else { @@ -500,13 +521,16 @@ private void fetchMajorityDeps(AsyncResults.SettableResult coordination, N }).begin((success, fail2) -> { if (fail2 != null) { - logger.warn("Failed to apply deps for syncing epoch {} for ranges {}", epoch, ranges, fail2); - node.scheduler().once(() -> fetchMajorityDeps(coordination, node, epoch, ranges), 1L, TimeUnit.MINUTES); + logger.warn("Failed to apply deps for syncing epoch {} for range {}", epoch, nextRange, fail2); + node.scheduler().once(() -> fetchMajorityDeps(coordination, node, epoch, ranges, index), 1L, TimeUnit.MINUTES); node.agent().onUncaughtException(fail2); } else { - coordination.setSuccess(null); + int prev = index.getAndSet(nextIndex); + Invariants.checkState(rangeIndex * FETCH_SLICES + subRangeIndex == prev); + if (nextIndex >= ranges.size() * FETCH_SLICES) coordination.setSuccess(null); + else fetchMajorityDeps(coordination, node, epoch, ranges, index); } }); } diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index c32b9ecec..e10d80d31 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -707,12 +707,12 @@ public CommandStore select(RoutingKey key) public CommandStore select(Route route) { - return select(ranges -> ranges.intersects(route)); + return select(ranges -> ranges.intersects(route)); } public CommandStore select(Participants participants) { - return select(ranges -> ranges.intersects(participants)); + return select(ranges -> ranges.intersects(participants)); } private CommandStore select(Predicate fn) diff --git a/accord-core/src/main/java/accord/messages/BeginRecovery.java b/accord-core/src/main/java/accord/messages/BeginRecovery.java index b434a183f..e91a4b05f 100644 --- a/accord-core/src/main/java/accord/messages/BeginRecovery.java +++ b/accord-core/src/main/java/accord/messages/BeginRecovery.java @@ -121,7 +121,6 @@ public RecoverReply apply(SafeCommandStore safeStore) Deps localDeps = null; if (!command.known().deps.hasCommittedOrDecidedDeps()) { - // TODO (required): consider whether we are safe ignoring the concept of minUnsyncedEpoch here localDeps = calculateDeps(safeStore, txnId, participants, constant(minEpoch), txnId); } diff --git a/accord-core/src/main/java/accord/primitives/LatestDeps.java b/accord-core/src/main/java/accord/primitives/LatestDeps.java index dd65779b8..313eddb3c 100644 --- a/accord-core/src/main/java/accord/primitives/LatestDeps.java +++ b/accord-core/src/main/java/accord/primitives/LatestDeps.java @@ -106,7 +106,10 @@ public LatestEntry(Known.KnownDeps known, Ballot ballot, Deps coordinatedDeps, D static LatestEntry reduce(LatestEntry a, LatestEntry b) { - return reduce(a, b, (v1, v2) -> new LatestEntry(v1.known, v1.ballot, v1.coordinatedDeps, v1.localDeps.with(v2.localDeps))); + return reduce(a, b, (v1, v2) -> new LatestEntry(v1.known, v1.ballot, v1.coordinatedDeps, + v1.localDeps == null ? v2.localDeps + : v2.localDeps == null ? v1.localDeps + : v1.localDeps.with(v2.localDeps))); } static LatestEntry slice(RoutingKey start, RoutingKey end, LatestEntry v) diff --git a/accord-core/src/main/java/accord/utils/async/AsyncChain.java b/accord-core/src/main/java/accord/utils/async/AsyncChain.java index a642321b0..0899b2684 100644 --- a/accord-core/src/main/java/accord/utils/async/AsyncChain.java +++ b/accord-core/src/main/java/accord/utils/async/AsyncChain.java @@ -25,6 +25,8 @@ import java.util.function.Consumer; import java.util.function.Function; +import javax.annotation.Nullable; + import com.google.common.util.concurrent.ListenableFuture; public interface AsyncChain @@ -126,7 +128,7 @@ default AsyncChain addCallback(Runnable runnable, ExecutorService es) * Causes the chain to begin, starting all work required. This method must be called exactly once, not calling will * not cause any work to start, and calling multiple times will be rejected. */ - void begin(BiConsumer callback); + @Nullable Cancellable begin(BiConsumer callback); default AsyncResult beginAsResult() { diff --git a/accord-core/src/main/java/accord/utils/async/AsyncChainCombiner.java b/accord-core/src/main/java/accord/utils/async/AsyncChainCombiner.java index b7e106114..06d8c3673 100644 --- a/accord-core/src/main/java/accord/utils/async/AsyncChainCombiner.java +++ b/accord-core/src/main/java/accord/utils/async/AsyncChainCombiner.java @@ -75,7 +75,7 @@ private BiConsumer callbackFor(int idx) } @Override - protected void start(BiConsumer callback) + protected Cancellable start(BiConsumer callback) { List> chains = inputs(); state = new Object[chains.size()]; @@ -83,7 +83,17 @@ protected void start(BiConsumer callback) int size = chains.size(); this.callback = callback; this.remaining = size; + Cancellable cancellable = null; for (int i=0; i { prev.cancel(); next.cancel(); }; + } + } + return cancellable; } } diff --git a/accord-core/src/main/java/accord/utils/async/AsyncChains.java b/accord-core/src/main/java/accord/utils/async/AsyncChains.java index 5e488fe7c..ec037b202 100644 --- a/accord-core/src/main/java/accord/utils/async/AsyncChains.java +++ b/accord-core/src/main/java/accord/utils/async/AsyncChains.java @@ -25,6 +25,8 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -34,6 +36,8 @@ import java.util.function.Consumer; import java.util.function.Function; +import javax.annotation.Nullable; + import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import org.slf4j.Logger; @@ -135,9 +139,10 @@ public AsyncChain addCallback(BiConsumer callback) } @Override - public void begin(BiConsumer callback) + public Cancellable begin(BiConsumer callback) { addCallback(callback); + return null; } } @@ -149,22 +154,22 @@ protected Head() next = this; } - protected abstract void start(BiConsumer callback); + protected abstract @Nullable Cancellable start(BiConsumer callback); @Override - public final void begin(BiConsumer callback) + public final @Nullable Cancellable begin(BiConsumer callback) { Invariants.checkArgument(next != null); next = null; - start(callback); + return start(callback); } - void begin() + Cancellable begin() { Invariants.checkArgument(next != null); BiConsumer next = this.next; this.next = null; - start(next); + return start(next); } @Override @@ -183,13 +188,13 @@ protected Link(Head head) } @Override - public void begin(BiConsumer callback) + public Cancellable begin(BiConsumer callback) { Invariants.checkArgument(!(callback instanceof AsyncChains.Head)); checkNextIsHead(); Head head = (Head) next; next = callback; - head.begin(); + return head.begin(); } } @@ -369,11 +374,12 @@ private DetectLeak(Consumer onLeak, Runnable onCall) } @Override - protected void start(BiConsumer callback) + protected Cancellable start(BiConsumer callback) { called.set(true); onCall.run(); callback.accept(null, null); + return null; } @Override @@ -416,9 +422,9 @@ int size() } @Override - protected void start(BiConsumer callback) + protected Cancellable start(BiConsumer callback) { - chain.map(r -> reduceArray(r, reducer)).begin(callback); + return chain.map(r -> reduceArray(r, reducer)).begin(callback); } } @@ -531,29 +537,21 @@ public static AsyncChain map(AsyncChain chain, Function new Head() { @Override - protected void start(BiConsumer callback) + protected Cancellable start(BiConsumer callback) { - try - { - executor.execute(() -> { - T value; - try - { - value = mapper.apply(v); - } - catch (Throwable t) - { - callback.accept(null, t); - return; - } - callback.accept(value, null); - }); - } - catch (Throwable t) - { - // TODO (low priority, correctness): If the executor is shutdown then the callback may run in an unexpected thread, which may not be thread safe - callback.accept(null, t); - } + return AsyncChains.submit(executor, callback, () -> { + T value; + try + { + value = mapper.apply(v); + } + catch (Throwable t) + { + callback.accept(null, t); + return; + } + callback.accept(value, null); + }); } }); } @@ -563,30 +561,45 @@ public static AsyncChain flatMap(AsyncChain chain, Function new Head() { @Override - protected void start(BiConsumer callback) + protected Cancellable start(BiConsumer callback) { - try - { - executor.execute(() -> { - try - { - mapper.apply(v).begin(callback); - } - catch (Throwable t) - { - callback.accept(null, t); - } - }); - } - catch (Throwable t) - { - // TODO (low priority, correctness): If the executor is shutdown then the callback may run in an unexpected thread, which may not be thread safe - callback.accept(null, t); - } + return AsyncChains.submit(executor, callback, () -> { + try + { + mapper.apply(v).begin(callback); + } + catch (Throwable t) + { + callback.accept(null, t); + } + }); } }); } + private static Cancellable submit(Executor executor, BiConsumer callback, Runnable run) + { + try + { + if (executor instanceof ExecutorService) + { + Future future = ((ExecutorService) executor).submit(run); + return () -> future.cancel(true); + } + else + { + executor.execute(run); + return null; + } + } + catch (Throwable t) + { + // TODO (low priority, correctness): If the executor is shutdown then the callback may run in an unexpected thread, which may not be thread safe + callback.accept(null, t); + return null; + } + } + public static AsyncChain ofCallable(Executor executor, Callable callable) { return ofCallable(executor, callable, AsyncChains::encapsulate); @@ -599,16 +612,9 @@ public static AsyncChain ofCallable(Executor executor, return new Head<>() { @Override - protected void start(BiConsumer callback) + protected Cancellable start(BiConsumer callback) { - try - { - executor.execute(encapsulator.apply(callable, callback)); - } - catch (Throwable t) - { - callback.accept(null, t); - } + return AsyncChains.submit(executor, callback, encapsulator.apply(callable, callback)); } }; } @@ -618,16 +624,9 @@ public static AsyncChain ofRunnable(Executor executor, Runnable runnable) return new Head() { @Override - protected void start(BiConsumer callback) + protected Cancellable start(BiConsumer callback) { - try - { - executor.execute(encapsulate(runnable, callback)); - } - catch (Throwable t) - { - callback.accept(null, t); - } + return AsyncChains.submit(executor, callback, encapsulate(runnable, callback)); } }; } diff --git a/accord-core/src/main/java/accord/utils/async/AsyncResult.java b/accord-core/src/main/java/accord/utils/async/AsyncResult.java index 451f2919c..30fd524ff 100644 --- a/accord-core/src/main/java/accord/utils/async/AsyncResult.java +++ b/accord-core/src/main/java/accord/utils/async/AsyncResult.java @@ -20,6 +20,8 @@ import java.util.function.BiConsumer; +import javax.annotation.Nullable; + import static accord.utils.Invariants.illegalState; /** @@ -41,10 +43,11 @@ default AsyncResult addCallback(Runnable runnable) boolean isSuccess(); @Override - default void begin(BiConsumer callback) + default @Nullable Cancellable begin(BiConsumer callback) { //TODO chain shouldn't allow double calling, but should result allow? addCallback(callback); + return null; } @Override diff --git a/accord-core/src/main/java/accord/utils/async/AsyncResults.java b/accord-core/src/main/java/accord/utils/async/AsyncResults.java index eea32f95e..0382d43b1 100644 --- a/accord-core/src/main/java/accord/utils/async/AsyncResults.java +++ b/accord-core/src/main/java/accord/utils/async/AsyncResults.java @@ -132,14 +132,15 @@ protected boolean tryFailure(Throwable throwable) return trySetResult(null, throwable); } - private AsyncChain newChain() + private AsyncChain newChain() { return new AsyncChains.Head() { @Override - protected void start(BiConsumer callback) + protected Cancellable start(BiConsumer callback) { AbstractResult.this.addCallback(callback); + return null; } }; } @@ -283,9 +284,10 @@ private AsyncChain newChain() return new AsyncChains.Head() { @Override - protected void start(BiConsumer callback) + protected Cancellable start(BiConsumer callback) { AsyncResults.Immediate.this.addCallback(callback); + return null; } }; } diff --git a/accord-core/src/main/java/accord/utils/async/Cancellable.java b/accord-core/src/main/java/accord/utils/async/Cancellable.java new file mode 100644 index 000000000..74fe4359f --- /dev/null +++ b/accord-core/src/main/java/accord/utils/async/Cancellable.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.utils.async; + +public interface Cancellable +{ + void cancel(); +} diff --git a/accord-core/src/main/java/accord/utils/async/Observable.java b/accord-core/src/main/java/accord/utils/async/Observable.java index c87f12e70..ec810d2b4 100644 --- a/accord-core/src/main/java/accord/utils/async/Observable.java +++ b/accord-core/src/main/java/accord/utils/async/Observable.java @@ -150,9 +150,10 @@ static AsyncChain asChain(Consumer() { @Override - protected void start(BiConsumer callback) + protected Cancellable start(BiConsumer callback) { work.accept(forCallback(callback, collector).map(mapper)); + return null; } }; } diff --git a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java index 35482b08d..b3f9370ef 100644 --- a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java +++ b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java @@ -68,6 +68,7 @@ import accord.utils.ReflectionUtils.Difference; import accord.utils.async.AsyncChain; import accord.utils.async.AsyncChains; +import accord.utils.async.Cancellable; import static accord.utils.Invariants.Paranoia.LINEAR; import static accord.utils.Invariants.ParanoiaCostFactor.HIGH; @@ -244,13 +245,14 @@ public AsyncChain submit(Callable fn) return new AsyncChains.Head() { @Override - protected void start(BiConsumer callback) + protected Cancellable start(BiConsumer callback) { boolean wasEmpty = pending.isEmpty(); pending.add(task); if (wasEmpty) runNextTask(); task.begin(callback); + return () -> pending.remove(task); } }; } diff --git a/accord-core/src/test/java/accord/utils/async/AsyncChainsTest.java b/accord-core/src/test/java/accord/utils/async/AsyncChainsTest.java index 973bf77f0..8f429a3eb 100644 --- a/accord-core/src/test/java/accord/utils/async/AsyncChainsTest.java +++ b/accord-core/src/test/java/accord/utils/async/AsyncChainsTest.java @@ -193,8 +193,9 @@ void headRejectsSecondBegin() { AsyncChain chain = new AsyncChains.Head() { @Override - protected void start(BiConsumer callback) { + protected Cancellable start(BiConsumer callback) { callback.accept("success", null); + return null; } }; @@ -207,8 +208,9 @@ void chainRejectsSecondBegin() { AsyncChain chain = new AsyncChains.Head() { @Override - protected void start(BiConsumer callback) { + protected Cancellable start(BiConsumer callback) { callback.accept("success", null); + return null; } }; chain = chain.map(s -> s + " is true"); @@ -254,8 +256,9 @@ void test3() void simpleHeadChain() throws ExecutionException, InterruptedException { AsyncChain chain = new AsyncChains.Head() { @Override - protected void start(BiConsumer callback) { + protected Cancellable start(BiConsumer callback) { callback.accept(0, null); + return null; } }; chain = chain.map(i -> i + 1) @@ -319,9 +322,10 @@ void exceptionHandling() topLevel.add(() -> new AsyncChains.Head() { @Override - protected void start(BiConsumer callback) + protected Cancellable start(BiConsumer callback) { callback.accept(42, null); + return null; } }); topLevel.add(() -> {