Skip to content

Commit

Permalink
Journal with command store
Browse files Browse the repository at this point in the history
  • Loading branch information
ifesdjeen committed Jul 8, 2024
1 parent 694ae39 commit c87c1f8
Show file tree
Hide file tree
Showing 10 changed files with 61 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -795,7 +795,7 @@ public RangesForEpoch ranges()
@Override
public void registerHistoricalTransactions(Deps deps)
{
RangesForEpoch rangesForEpoch = commandStore.rangesForEpoch;
RangesForEpoch rangesForEpoch = commandStore.rangesForEpoch();
Ranges allRanges = rangesForEpoch.all();
deps.keyDeps.keys().forEach(allRanges, key -> {
deps.keyDeps.forEach(key, (txnId, txnIdx) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,7 @@ public void durable(Command command)
if (command.route() == null)
return;

Ranges coordinateRanges = commandStore.unsafeRangesForEpoch().allAt(command.txnId().epoch());
Ranges coordinateRanges = commandStore.rangesForEpoch().allAt(command.txnId().epoch());
if (!command.status().hasBeen(PreApplied) && command.route().participatesIn(coordinateRanges))
state.recordBlocking(command.txnId(), WaitingToApply, command.route(), null);
if (coordinateRanges.contains(command.route().homeKey()))
Expand Down
17 changes: 14 additions & 3 deletions accord-core/src/main/java/accord/local/Command.java
Original file line number Diff line number Diff line change
Expand Up @@ -645,9 +645,15 @@ public final boolean isCommitted()
SaveStatus saveStatus = saveStatus();
return saveStatus.hasBeen(Status.Committed) && !saveStatus.hasBeen(Invalidated);
}

public final boolean isStable()
{
SaveStatus saveStatus = saveStatus();
return isStable(saveStatus);
}

public static boolean isStable(SaveStatus saveStatus)
{
return saveStatus.hasBeen(Status.Stable) && !saveStatus.hasBeen(Invalidated);
}

Expand All @@ -666,6 +672,11 @@ public final boolean isTruncated()
return status().hasBeen(Status.Truncated);
}

public static boolean isTruncated(Status status)
{
return status.hasBeen(Status.Truncated);
}

public abstract Command updateAttributes(CommonAttributes attrs, Ballot promised);

public final Command updateAttributes(CommonAttributes attrs)
Expand Down Expand Up @@ -1105,7 +1116,7 @@ public boolean isEqualOrFuller(Command c)
return Objects.equals(acceptedOrCommitted(), that.acceptedOrCommitted());
}

static Accepted accepted(CommonAttributes common, SaveStatus status, Timestamp executeAt, Ballot promised, Ballot accepted)
public static Accepted accepted(CommonAttributes common, SaveStatus status, Timestamp executeAt, Ballot promised, Ballot accepted)
{
return validate(new Accepted(common, status, promised, executeAt, accepted));
}
Expand Down Expand Up @@ -1197,7 +1208,7 @@ static Committed committed(Committed command, CommonAttributes common, WaitingOn
return committed(command, common, command.promised(), command.saveStatus(), waitingOn);
}

static Committed committed(CommonAttributes common, SaveStatus status, Timestamp executeAt, Ballot promised, Ballot accepted, WaitingOn waitingOn)
public static Committed committed(CommonAttributes common, SaveStatus status, Timestamp executeAt, Ballot promised, Ballot accepted, WaitingOn waitingOn)
{
return validate(new Committed(common, status, executeAt, promised, accepted, waitingOn));
}
Expand Down Expand Up @@ -1240,7 +1251,7 @@ public boolean equals(Object o)
if (!super.equals(o)) return false;
Executed executed = (Executed) o;
return Objects.equals(writes, executed.writes)
&& Objects.equals(result, executed.result);
&& Objects.equals(result, executed.result); // TODO: find a different way to check/assert?
}

@Override
Expand Down
52 changes: 30 additions & 22 deletions accord-core/src/main/java/accord/local/CommandStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,16 @@
*/
public abstract class CommandStore implements AgentExecutor
{
static class EpochUpdate
public static class EpochUpdate
{
final long epoch;
final RangesForEpoch newRangesForEpoch;
final RedundantBefore addRedundantBefore;
final Ranges addGlobalRanges;

EpochUpdate(RangesForEpoch newRangesForEpoch, RedundantBefore addRedundantBefore, Ranges addGlobalRanges)
EpochUpdate(long epoch, RangesForEpoch newRangesForEpoch, RedundantBefore addRedundantBefore, Ranges addGlobalRanges)
{
this.epoch = epoch;
this.newRangesForEpoch = newRangesForEpoch;
this.addRedundantBefore = addRedundantBefore;
this.addGlobalRanges = addGlobalRanges;
Expand All @@ -96,32 +98,32 @@ static class EpochUpdate
public static class EpochUpdateHolder extends AtomicReference<EpochUpdate>
{
// TODO (required, eventually): support removing ranges
public void updateGlobal(Ranges addGlobalRanges)
public void updateGlobal(long epoch, Ranges addGlobalRanges)
{
EpochUpdate baseUpdate = new EpochUpdate(null, RedundantBefore.EMPTY, addGlobalRanges);
EpochUpdate baseUpdate = new EpochUpdate(epoch, null, RedundantBefore.EMPTY, addGlobalRanges);
EpochUpdate cur = get();
if (cur == null || !compareAndSet(cur, new EpochUpdate(cur.newRangesForEpoch, cur.addRedundantBefore, cur.addGlobalRanges.with(addGlobalRanges))))
if (cur == null || !compareAndSet(cur, new EpochUpdate(epoch, cur.newRangesForEpoch, cur.addRedundantBefore, cur.addGlobalRanges.with(addGlobalRanges))))
set(baseUpdate);
}

// TODO (desired): can better encapsulate by accepting only the newRangesForEpoch and deriving the add/remove ranges
public void add(long epoch, RangesForEpoch newRangesForEpoch, Ranges addRanges)
{
RedundantBefore addRedundantBefore = RedundantBefore.create(addRanges, epoch, Long.MAX_VALUE, TxnId.NONE, TxnId.NONE, TxnId.minForEpoch(epoch));
update(newRangesForEpoch, addRedundantBefore);
update(epoch, newRangesForEpoch, addRedundantBefore);
}

public void remove(long epoch, RangesForEpoch newRangesForEpoch, Ranges removeRanges)
{
RedundantBefore addRedundantBefore = RedundantBefore.create(removeRanges, Long.MIN_VALUE, epoch, TxnId.NONE, TxnId.NONE, TxnId.NONE);
update(newRangesForEpoch, addRedundantBefore);
update(epoch, newRangesForEpoch, addRedundantBefore);
}

private void update(RangesForEpoch newRangesForEpoch, RedundantBefore addRedundantBefore)
private void update(long epoch, RangesForEpoch newRangesForEpoch, RedundantBefore addRedundantBefore)
{
EpochUpdate baseUpdate = new EpochUpdate(newRangesForEpoch, addRedundantBefore, Ranges.EMPTY);
EpochUpdate baseUpdate = new EpochUpdate(epoch, newRangesForEpoch, addRedundantBefore, Ranges.EMPTY);
EpochUpdate cur = get();
if (cur == null || !compareAndSet(cur, new EpochUpdate(newRangesForEpoch, RedundantBefore.merge(cur.addRedundantBefore, addRedundantBefore), cur.addGlobalRanges)))
if (cur == null || !compareAndSet(cur, new EpochUpdate(epoch, newRangesForEpoch, RedundantBefore.merge(cur.addRedundantBefore, addRedundantBefore), cur.addGlobalRanges)))
set(baseUpdate);
}
}
Expand All @@ -144,6 +146,7 @@ CommandStore create(int id,
protected final DataStore store;
protected final ProgressLog progressLog;
protected final EpochUpdateHolder epochUpdateHolder;
private long propatatedUpTo = -1;

// TODO (expected): schedule regular pruning of these collections
// bootstrapBeganAt and shardDurableAt are both canonical data sets mostly used for debugging / constructing
Expand All @@ -153,7 +156,6 @@ CommandStore create(int id,
// TODO (expected): store this only once per node
private DurableBefore durableBefore = DurableBefore.EMPTY;
private MaxConflicts maxConflicts = MaxConflicts.EMPTY;
protected RangesForEpoch rangesForEpoch;

// TODO (desired): merge with redundantBefore?
/**
Expand Down Expand Up @@ -200,21 +202,27 @@ public RangesForEpoch updateRangesForEpoch()
{
EpochUpdate update = epochUpdateHolder.get();
if (update == null)
return rangesForEpoch;
return null;

update = epochUpdateHolder.getAndSet(null);
if (!update.addGlobalRanges.isEmpty())
setDurableBefore(DurableBefore.merge(durableBefore, DurableBefore.create(update.addGlobalRanges, TxnId.NONE, TxnId.NONE)));
if (update.addRedundantBefore.size() > 0)
setRedundantBefore(RedundantBefore.merge(redundantBefore, update.addRedundantBefore));
if (update.newRangesForEpoch != null)
rangesForEpoch = update.newRangesForEpoch;
return rangesForEpoch;
if (propatatedUpTo < update.epoch)
{
if (!update.addGlobalRanges.isEmpty())
setDurableBefore(DurableBefore.merge(durableBefore, DurableBefore.create(update.addGlobalRanges, TxnId.NONE, TxnId.NONE)));
if (update.addRedundantBefore.size() > 0)
setRedundantBefore(RedundantBefore.merge(redundantBefore, update.addRedundantBefore));
propatatedUpTo = update.epoch;
}

return update.newRangesForEpoch;
}

public RangesForEpoch unsafeRangesForEpoch()
public RangesForEpoch rangesForEpoch()
{
return rangesForEpoch;
EpochUpdate update = epochUpdateHolder.get();
if (update == null)
return null;

return update.newRangesForEpoch;
}

public abstract boolean inStore();
Expand Down
4 changes: 2 additions & 2 deletions accord-core/src/main/java/accord/local/CommandStores.java
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ private synchronized TopologyUpdate updateTopology(Node node, Snapshot prev, Top
{
for (ShardHolder shard : prev.shards)
{
shard.store.epochUpdateHolder.updateGlobal(addedGlobal);
shard.store.epochUpdateHolder.updateGlobal(epoch, addedGlobal);
}
}

Expand Down Expand Up @@ -458,7 +458,7 @@ private synchronized TopologyUpdate updateTopology(Node node, Snapshot prev, Top
ShardHolder shard = new ShardHolder(supplier.create(nextId++, updateHolder));
shard.ranges = new RangesForEpoch(epoch, addRanges, shard.store);
shard.store.epochUpdateHolder.add(epoch, shard.ranges, addRanges);
shard.store.epochUpdateHolder.updateGlobal(newTopology.ranges());
shard.store.epochUpdateHolder.updateGlobal(newTopology.epoch(), newTopology.ranges());

Map<Boolean, Ranges> partitioned = addRanges.partitioningBy(range -> shouldBootstrap(node, prev.global, newLocalTopology, range));
if (partitioned.containsKey(false))
Expand Down
3 changes: 3 additions & 0 deletions accord-core/src/main/java/accord/local/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,9 @@ protected void start(BiConsumer<? super R, Throwable> callback)

public void reply(Id replyingToNode, ReplyContext replyContext, Reply send, Throwable failure)
{
if (replyingToNode == Id.NONE)
return;

if (failure != null)
{
agent.onUncaughtException(failure);
Expand Down
7 changes: 7 additions & 0 deletions accord-core/src/main/java/accord/utils/Invariants.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package accord.utils;

import java.util.function.Predicate;
import java.util.function.Supplier;
import javax.annotation.Nullable;

import net.nicoulaj.compilecommand.annotations.Inline;
Expand Down Expand Up @@ -101,6 +102,12 @@ public static void checkState(boolean condition)
illegalState();
}

public static void checkState(boolean condition, Supplier<String> msg)
{
if (!condition)
illegalState(msg.get());
}

public static void checkState(boolean condition, String msg)
{
if (!condition)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public void validateRead(Command current)
Command reconstructed;
try
{
reconstructed = SerializerSupport.reconstruct(agent(), unsafeRangesForEpoch(), mutable, current.saveStatus(), current.executeAt(), current.txnId().kind().awaitsOnlyDeps() ? current.executesAtLeast() : null, current.promised(), current.acceptedOrCommitted(), ignore -> finalWaitingOn, messages);
reconstructed = SerializerSupport.reconstruct(agent(), rangesForEpoch(), mutable, current.saveStatus(), current.executeAt(), current.txnId().kind().awaitsOnlyDeps() ? current.executesAtLeast() : null, current.promised(), current.acceptedOrCommitted(), ignore -> finalWaitingOn, messages);
}
catch (IllegalStateException t)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public CollectMaxAppliedFetchRequest(long sourceEpoch, TxnId syncId, Ranges rang
@Override
protected void readComplete(CommandStore commandStore, Data result, Ranges unavailable)
{
Ranges slice = commandStore.unsafeRangesForEpoch().allAt(txnId).subtract(unavailable);
Ranges slice = commandStore.rangesForEpoch().allAt(txnId).subtract(unavailable);
((InMemoryCommandStore)commandStore).maxAppliedFor((Ranges)readScope, slice).begin((newMaxApplied, failure) -> {
if (failure != null)
{
Expand Down
2 changes: 1 addition & 1 deletion accord-core/src/test/java/accord/local/CommandsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ private static void checkState(int attempt, Collection<Node> values)
// current limitation of SimpleProgressLog is that only the home shard will attempt to recover, so
// non-home shards may stay PreAccepted!
RoutingKey key = command.homeKey();
if (key != null && !store.rangesForEpoch.allAt(command.txnId().epoch()).contains(key))
if (key != null && !store.rangesForEpoch().allAt(command.txnId().epoch()).contains(key))
{
// non-home shard... make sure the state is as expected
if (command.status() == Status.PreAccepted)
Expand Down

0 comments on commit c87c1f8

Please sign in to comment.