From c87c1f807ba500f7a2a1813d1a5abb3ff726e92b Mon Sep 17 00:00:00 2001 From: Alex Petrov Date: Mon, 17 Jun 2024 09:53:43 +0200 Subject: [PATCH] Journal with command store --- .../accord/impl/InMemoryCommandStore.java | 2 +- .../java/accord/impl/SimpleProgressLog.java | 2 +- .../src/main/java/accord/local/Command.java | 17 ++++-- .../main/java/accord/local/CommandStore.java | 52 +++++++++++-------- .../main/java/accord/local/CommandStores.java | 4 +- .../src/main/java/accord/local/Node.java | 3 ++ .../main/java/accord/utils/Invariants.java | 7 +++ .../impl/basic/DelayedCommandStores.java | 2 +- .../impl/list/ListFetchCoordinator.java | 2 +- .../test/java/accord/local/CommandsTest.java | 2 +- 10 files changed, 61 insertions(+), 32 deletions(-) diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java index 5ee3f8d5f..3feaa3d18 100644 --- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java +++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java @@ -795,7 +795,7 @@ public RangesForEpoch ranges() @Override public void registerHistoricalTransactions(Deps deps) { - RangesForEpoch rangesForEpoch = commandStore.rangesForEpoch; + RangesForEpoch rangesForEpoch = commandStore.rangesForEpoch(); Ranges allRanges = rangesForEpoch.all(); deps.keyDeps.keys().forEach(allRanges, key -> { deps.keyDeps.forEach(key, (txnId, txnIdx) -> { diff --git a/accord-core/src/main/java/accord/impl/SimpleProgressLog.java b/accord-core/src/main/java/accord/impl/SimpleProgressLog.java index 0ab1962f6..013578460 100644 --- a/accord-core/src/main/java/accord/impl/SimpleProgressLog.java +++ b/accord-core/src/main/java/accord/impl/SimpleProgressLog.java @@ -604,7 +604,7 @@ public void durable(Command command) if (command.route() == null) return; - Ranges coordinateRanges = commandStore.unsafeRangesForEpoch().allAt(command.txnId().epoch()); + Ranges coordinateRanges = commandStore.rangesForEpoch().allAt(command.txnId().epoch()); if (!command.status().hasBeen(PreApplied) && command.route().participatesIn(coordinateRanges)) state.recordBlocking(command.txnId(), WaitingToApply, command.route(), null); if (coordinateRanges.contains(command.route().homeKey())) diff --git a/accord-core/src/main/java/accord/local/Command.java b/accord-core/src/main/java/accord/local/Command.java index 96df4461b..93cc1a690 100644 --- a/accord-core/src/main/java/accord/local/Command.java +++ b/accord-core/src/main/java/accord/local/Command.java @@ -645,9 +645,15 @@ public final boolean isCommitted() SaveStatus saveStatus = saveStatus(); return saveStatus.hasBeen(Status.Committed) && !saveStatus.hasBeen(Invalidated); } + public final boolean isStable() { SaveStatus saveStatus = saveStatus(); + return isStable(saveStatus); + } + + public static boolean isStable(SaveStatus saveStatus) + { return saveStatus.hasBeen(Status.Stable) && !saveStatus.hasBeen(Invalidated); } @@ -666,6 +672,11 @@ public final boolean isTruncated() return status().hasBeen(Status.Truncated); } + public static boolean isTruncated(Status status) + { + return status.hasBeen(Status.Truncated); + } + public abstract Command updateAttributes(CommonAttributes attrs, Ballot promised); public final Command updateAttributes(CommonAttributes attrs) @@ -1105,7 +1116,7 @@ public boolean isEqualOrFuller(Command c) return Objects.equals(acceptedOrCommitted(), that.acceptedOrCommitted()); } - static Accepted accepted(CommonAttributes common, SaveStatus status, Timestamp executeAt, Ballot promised, Ballot accepted) + public static Accepted accepted(CommonAttributes common, SaveStatus status, Timestamp executeAt, Ballot promised, Ballot accepted) { return validate(new Accepted(common, status, promised, executeAt, accepted)); } @@ -1197,7 +1208,7 @@ static Committed committed(Committed command, CommonAttributes common, WaitingOn return committed(command, common, command.promised(), command.saveStatus(), waitingOn); } - static Committed committed(CommonAttributes common, SaveStatus status, Timestamp executeAt, Ballot promised, Ballot accepted, WaitingOn waitingOn) + public static Committed committed(CommonAttributes common, SaveStatus status, Timestamp executeAt, Ballot promised, Ballot accepted, WaitingOn waitingOn) { return validate(new Committed(common, status, executeAt, promised, accepted, waitingOn)); } @@ -1240,7 +1251,7 @@ public boolean equals(Object o) if (!super.equals(o)) return false; Executed executed = (Executed) o; return Objects.equals(writes, executed.writes) - && Objects.equals(result, executed.result); + && Objects.equals(result, executed.result); // TODO: find a different way to check/assert? } @Override diff --git a/accord-core/src/main/java/accord/local/CommandStore.java b/accord-core/src/main/java/accord/local/CommandStore.java index c7baf2f51..3ad0227d8 100644 --- a/accord-core/src/main/java/accord/local/CommandStore.java +++ b/accord-core/src/main/java/accord/local/CommandStore.java @@ -79,14 +79,16 @@ */ public abstract class CommandStore implements AgentExecutor { - static class EpochUpdate + public static class EpochUpdate { + final long epoch; final RangesForEpoch newRangesForEpoch; final RedundantBefore addRedundantBefore; final Ranges addGlobalRanges; - EpochUpdate(RangesForEpoch newRangesForEpoch, RedundantBefore addRedundantBefore, Ranges addGlobalRanges) + EpochUpdate(long epoch, RangesForEpoch newRangesForEpoch, RedundantBefore addRedundantBefore, Ranges addGlobalRanges) { + this.epoch = epoch; this.newRangesForEpoch = newRangesForEpoch; this.addRedundantBefore = addRedundantBefore; this.addGlobalRanges = addGlobalRanges; @@ -96,11 +98,11 @@ static class EpochUpdate public static class EpochUpdateHolder extends AtomicReference { // TODO (required, eventually): support removing ranges - public void updateGlobal(Ranges addGlobalRanges) + public void updateGlobal(long epoch, Ranges addGlobalRanges) { - EpochUpdate baseUpdate = new EpochUpdate(null, RedundantBefore.EMPTY, addGlobalRanges); + EpochUpdate baseUpdate = new EpochUpdate(epoch, null, RedundantBefore.EMPTY, addGlobalRanges); EpochUpdate cur = get(); - if (cur == null || !compareAndSet(cur, new EpochUpdate(cur.newRangesForEpoch, cur.addRedundantBefore, cur.addGlobalRanges.with(addGlobalRanges)))) + if (cur == null || !compareAndSet(cur, new EpochUpdate(epoch, cur.newRangesForEpoch, cur.addRedundantBefore, cur.addGlobalRanges.with(addGlobalRanges)))) set(baseUpdate); } @@ -108,20 +110,20 @@ public void updateGlobal(Ranges addGlobalRanges) public void add(long epoch, RangesForEpoch newRangesForEpoch, Ranges addRanges) { RedundantBefore addRedundantBefore = RedundantBefore.create(addRanges, epoch, Long.MAX_VALUE, TxnId.NONE, TxnId.NONE, TxnId.minForEpoch(epoch)); - update(newRangesForEpoch, addRedundantBefore); + update(epoch, newRangesForEpoch, addRedundantBefore); } public void remove(long epoch, RangesForEpoch newRangesForEpoch, Ranges removeRanges) { RedundantBefore addRedundantBefore = RedundantBefore.create(removeRanges, Long.MIN_VALUE, epoch, TxnId.NONE, TxnId.NONE, TxnId.NONE); - update(newRangesForEpoch, addRedundantBefore); + update(epoch, newRangesForEpoch, addRedundantBefore); } - private void update(RangesForEpoch newRangesForEpoch, RedundantBefore addRedundantBefore) + private void update(long epoch, RangesForEpoch newRangesForEpoch, RedundantBefore addRedundantBefore) { - EpochUpdate baseUpdate = new EpochUpdate(newRangesForEpoch, addRedundantBefore, Ranges.EMPTY); + EpochUpdate baseUpdate = new EpochUpdate(epoch, newRangesForEpoch, addRedundantBefore, Ranges.EMPTY); EpochUpdate cur = get(); - if (cur == null || !compareAndSet(cur, new EpochUpdate(newRangesForEpoch, RedundantBefore.merge(cur.addRedundantBefore, addRedundantBefore), cur.addGlobalRanges))) + if (cur == null || !compareAndSet(cur, new EpochUpdate(epoch, newRangesForEpoch, RedundantBefore.merge(cur.addRedundantBefore, addRedundantBefore), cur.addGlobalRanges))) set(baseUpdate); } } @@ -144,6 +146,7 @@ CommandStore create(int id, protected final DataStore store; protected final ProgressLog progressLog; protected final EpochUpdateHolder epochUpdateHolder; + private long propatatedUpTo = -1; // TODO (expected): schedule regular pruning of these collections // bootstrapBeganAt and shardDurableAt are both canonical data sets mostly used for debugging / constructing @@ -153,7 +156,6 @@ CommandStore create(int id, // TODO (expected): store this only once per node private DurableBefore durableBefore = DurableBefore.EMPTY; private MaxConflicts maxConflicts = MaxConflicts.EMPTY; - protected RangesForEpoch rangesForEpoch; // TODO (desired): merge with redundantBefore? /** @@ -200,21 +202,27 @@ public RangesForEpoch updateRangesForEpoch() { EpochUpdate update = epochUpdateHolder.get(); if (update == null) - return rangesForEpoch; + return null; - update = epochUpdateHolder.getAndSet(null); - if (!update.addGlobalRanges.isEmpty()) - setDurableBefore(DurableBefore.merge(durableBefore, DurableBefore.create(update.addGlobalRanges, TxnId.NONE, TxnId.NONE))); - if (update.addRedundantBefore.size() > 0) - setRedundantBefore(RedundantBefore.merge(redundantBefore, update.addRedundantBefore)); - if (update.newRangesForEpoch != null) - rangesForEpoch = update.newRangesForEpoch; - return rangesForEpoch; + if (propatatedUpTo < update.epoch) + { + if (!update.addGlobalRanges.isEmpty()) + setDurableBefore(DurableBefore.merge(durableBefore, DurableBefore.create(update.addGlobalRanges, TxnId.NONE, TxnId.NONE))); + if (update.addRedundantBefore.size() > 0) + setRedundantBefore(RedundantBefore.merge(redundantBefore, update.addRedundantBefore)); + propatatedUpTo = update.epoch; + } + + return update.newRangesForEpoch; } - public RangesForEpoch unsafeRangesForEpoch() + public RangesForEpoch rangesForEpoch() { - return rangesForEpoch; + EpochUpdate update = epochUpdateHolder.get(); + if (update == null) + return null; + + return update.newRangesForEpoch; } public abstract boolean inStore(); diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index 20f08a8e0..1cbf4e14c 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -412,7 +412,7 @@ private synchronized TopologyUpdate updateTopology(Node node, Snapshot prev, Top { for (ShardHolder shard : prev.shards) { - shard.store.epochUpdateHolder.updateGlobal(addedGlobal); + shard.store.epochUpdateHolder.updateGlobal(epoch, addedGlobal); } } @@ -458,7 +458,7 @@ private synchronized TopologyUpdate updateTopology(Node node, Snapshot prev, Top ShardHolder shard = new ShardHolder(supplier.create(nextId++, updateHolder)); shard.ranges = new RangesForEpoch(epoch, addRanges, shard.store); shard.store.epochUpdateHolder.add(epoch, shard.ranges, addRanges); - shard.store.epochUpdateHolder.updateGlobal(newTopology.ranges()); + shard.store.epochUpdateHolder.updateGlobal(newTopology.epoch(), newTopology.ranges()); Map partitioned = addRanges.partitioningBy(range -> shouldBootstrap(node, prev.global, newLocalTopology, range)); if (partitioned.containsKey(false)) diff --git a/accord-core/src/main/java/accord/local/Node.java b/accord-core/src/main/java/accord/local/Node.java index 74446cee9..63738c375 100644 --- a/accord-core/src/main/java/accord/local/Node.java +++ b/accord-core/src/main/java/accord/local/Node.java @@ -558,6 +558,9 @@ protected void start(BiConsumer callback) public void reply(Id replyingToNode, ReplyContext replyContext, Reply send, Throwable failure) { + if (replyingToNode == Id.NONE) + return; + if (failure != null) { agent.onUncaughtException(failure); diff --git a/accord-core/src/main/java/accord/utils/Invariants.java b/accord-core/src/main/java/accord/utils/Invariants.java index 842ed5d39..ad9d8b350 100644 --- a/accord-core/src/main/java/accord/utils/Invariants.java +++ b/accord-core/src/main/java/accord/utils/Invariants.java @@ -19,6 +19,7 @@ package accord.utils; import java.util.function.Predicate; +import java.util.function.Supplier; import javax.annotation.Nullable; import net.nicoulaj.compilecommand.annotations.Inline; @@ -101,6 +102,12 @@ public static void checkState(boolean condition) illegalState(); } + public static void checkState(boolean condition, Supplier msg) + { + if (!condition) + illegalState(msg.get()); + } + public static void checkState(boolean condition, String msg) { if (!condition) diff --git a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java index 5ba32268f..deb1d8d5b 100644 --- a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java +++ b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java @@ -183,7 +183,7 @@ public void validateRead(Command current) Command reconstructed; try { - reconstructed = SerializerSupport.reconstruct(agent(), unsafeRangesForEpoch(), mutable, current.saveStatus(), current.executeAt(), current.txnId().kind().awaitsOnlyDeps() ? current.executesAtLeast() : null, current.promised(), current.acceptedOrCommitted(), ignore -> finalWaitingOn, messages); + reconstructed = SerializerSupport.reconstruct(agent(), rangesForEpoch(), mutable, current.saveStatus(), current.executeAt(), current.txnId().kind().awaitsOnlyDeps() ? current.executesAtLeast() : null, current.promised(), current.acceptedOrCommitted(), ignore -> finalWaitingOn, messages); } catch (IllegalStateException t) { diff --git a/accord-core/src/test/java/accord/impl/list/ListFetchCoordinator.java b/accord-core/src/test/java/accord/impl/list/ListFetchCoordinator.java index 9f7cc28b7..1bd919751 100644 --- a/accord-core/src/test/java/accord/impl/list/ListFetchCoordinator.java +++ b/accord-core/src/test/java/accord/impl/list/ListFetchCoordinator.java @@ -92,7 +92,7 @@ public CollectMaxAppliedFetchRequest(long sourceEpoch, TxnId syncId, Ranges rang @Override protected void readComplete(CommandStore commandStore, Data result, Ranges unavailable) { - Ranges slice = commandStore.unsafeRangesForEpoch().allAt(txnId).subtract(unavailable); + Ranges slice = commandStore.rangesForEpoch().allAt(txnId).subtract(unavailable); ((InMemoryCommandStore)commandStore).maxAppliedFor((Ranges)readScope, slice).begin((newMaxApplied, failure) -> { if (failure != null) { diff --git a/accord-core/src/test/java/accord/local/CommandsTest.java b/accord-core/src/test/java/accord/local/CommandsTest.java index d8a00731f..438bf3fe5 100644 --- a/accord-core/src/test/java/accord/local/CommandsTest.java +++ b/accord-core/src/test/java/accord/local/CommandsTest.java @@ -151,7 +151,7 @@ private static void checkState(int attempt, Collection values) // current limitation of SimpleProgressLog is that only the home shard will attempt to recover, so // non-home shards may stay PreAccepted! RoutingKey key = command.homeKey(); - if (key != null && !store.rangesForEpoch.allAt(command.txnId().epoch()).contains(key)) + if (key != null && !store.rangesForEpoch().allAt(command.txnId().epoch()).contains(key)) { // non-home shard... make sure the state is as expected if (command.status() == Status.PreAccepted)