Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CASSANDRA-19805: ExclusiveSyncPoints should execute on all intersecting epochs #107

Open
wants to merge 9 commits into
base: trunk
Choose a base branch
from
4 changes: 4 additions & 0 deletions accord-core/src/main/java/accord/api/Key.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package accord.api;

import javax.annotation.Nonnull;

import accord.primitives.Range;
import accord.primitives.RoutableKey;
import accord.primitives.Seekable;
Expand All @@ -35,4 +37,6 @@ public interface Key extends Seekable, RoutableKey

@Override
default Range asRange() { throw new UnsupportedOperationException(); }

default int compareAsRoutingKey(@Nonnull RoutingKey that) { return toUnseekable().compareTo(that); }
}
3 changes: 3 additions & 0 deletions accord-core/src/main/java/accord/api/Read.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package accord.api;

import accord.local.SafeCommandStore;
import accord.primitives.Participants;
import accord.primitives.Ranges;
import accord.primitives.Seekable;
import accord.primitives.Seekables;
Expand All @@ -33,5 +34,7 @@ public interface Read
Seekables<?, ?> keys();
AsyncChain<Data> read(Seekable key, SafeCommandStore commandStore, Timestamp executeAt, DataStore store);
Read slice(Ranges ranges);
Read intersecting(Participants<?> participants);
Read merge(Read other);

}
3 changes: 3 additions & 0 deletions accord-core/src/main/java/accord/api/Update.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package accord.api;

import accord.primitives.Participants;
import accord.primitives.Ranges;
import accord.primitives.Seekables;
import accord.primitives.Timestamp;
Expand All @@ -35,5 +36,7 @@ public interface Update
// null is provided only if nothing was read
Write apply(Timestamp executeAt, @Nullable Data data);
Update slice(Ranges ranges);
Update intersecting(Participants<?> participants);
Update merge(Update other);

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@

package accord.coordinate;

import java.util.LinkedHashMap;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.function.BiConsumer;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -103,7 +103,7 @@ else if (tracker.recordSuccess(from) == Success)
private Topologies topologies;
private boolean initialRoundIsDone;
private ExtraEpochs extraEpochs;
private Map<Id, Object> debug = Invariants.debug() ? new LinkedHashMap<>() : null;
private Map<Id, Object> debug = Invariants.debug() ? new TreeMap<>() : null;

AbstractCoordinatePreAccept(Node node, FullRoute<?> route, TxnId txnId)
{
Expand All @@ -124,7 +124,7 @@ final void start()
}

abstract Seekables<?, ?> keysOrRanges();
abstract void contact(Set<Id> nodes, Topologies topologies, Callback<R> callback);
abstract void contact(Collection<Id> nodes, Topologies topologies, Callback<R> callback);
abstract void onSuccessInternal(Id from, R reply);
/**
* The tracker for the extra rounds only is provided by the AbstractCoordinatePreAccept, so we expect a boolean back
Expand Down
2 changes: 1 addition & 1 deletion accord-core/src/main/java/accord/coordinate/Barrier.java
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ class ExistingTransactionCheck extends AsyncResults.AbstractResult<BarrierTxn> i
@Override
public BarrierTxn apply(SafeCommandStore safeStore)
{
// TODO (required): consider these semantics
// TODO (required, consider): consider these semantics
BarrierTxn found = safeStore.mapReduceFull(
seekables,
safeStore.ranges().allAfter(minEpoch),
Expand Down
19 changes: 10 additions & 9 deletions accord-core/src/main/java/accord/coordinate/CollectDeps.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.List;
import java.util.function.BiConsumer;

import accord.api.RoutingKey;
import accord.coordinate.tracking.QuorumTracker;
import accord.local.CommandStore;
import accord.local.Node;
Expand All @@ -39,7 +40,7 @@ public class CollectDeps implements Callback<GetDepsOk>
{
final Node node;
final TxnId txnId;
final Route<?> route;
final RoutingKey homeKey;

final Timestamp executeAt;

Expand All @@ -48,25 +49,25 @@ public class CollectDeps implements Callback<GetDepsOk>
private final BiConsumer<Deps, Throwable> callback;
private boolean isDone;

CollectDeps(Node node, Topologies topologies, TxnId txnId, FullRoute<?> route, Timestamp executeAt, BiConsumer<Deps, Throwable> callback)
CollectDeps(Node node, Topologies topologies, TxnId txnId, RoutingKey homeKey, Timestamp executeAt, BiConsumer<Deps, Throwable> callback)
{
this.node = node;
this.txnId = txnId;
this.route = route;
this.homeKey = homeKey;
this.executeAt = executeAt;
this.callback = callback;
this.oks = new ArrayList<>();
this.tracker = new QuorumTracker(topologies);
}

public static void withDeps(Node node, TxnId txnId, FullRoute<?> route, Seekables<?, ?> keysOrRanges, Timestamp executeAt, BiConsumer<Deps, Throwable> callback)
public static void withDeps(Node node, TxnId txnId, FullRoute<?> fullRoute, Unseekables<?> sendTo, Seekables<?, ?> keysOrRanges, Timestamp executeAt, BiConsumer<Deps, Throwable> callback)
{
Topologies topologies = node.topology().withUnsyncedEpochs(route, txnId, executeAt);
CollectDeps collect = new CollectDeps(node, topologies, txnId, route, executeAt, callback);
Topologies topologies = node.topology().withUnsyncedEpochs(sendTo, txnId, executeAt);
CollectDeps collect = new CollectDeps(node, topologies, txnId, fullRoute.homeKey(), executeAt, callback);
CommandStore store = CommandStore.maybeCurrent();
if (store == null)
store = node.commandStores().select(route);
node.send(collect.tracker.nodes(), to -> new GetDeps(to, topologies, route, txnId, keysOrRanges, executeAt),
store = node.commandStores().select(fullRoute);
node.send(collect.tracker.nodes(), to -> new GetDeps(to, topologies, fullRoute, txnId, keysOrRanges, executeAt),
store, collect);
}

Expand All @@ -90,7 +91,7 @@ public void onFailure(Id from, Throwable failure)
if (tracker.recordFailure(from) == Failed)
{
isDone = true;
callback.accept(null, new Timeout(txnId, route.homeKey()));
callback.accept(null, new Timeout(txnId, homeKey));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
package accord.coordinate;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;

import accord.api.Result;
import accord.coordinate.tracking.QuorumTracker;
Expand Down Expand Up @@ -90,7 +90,7 @@ public static AsyncResult<Result> coordinate(Node node, FullRoute<?> route, TxnI
}

@Override
void contact(Set<Node.Id> nodes, Topologies topologies, Callback<GetEphemeralReadDepsOk> callback)
void contact(Collection<Node.Id> nodes, Topologies topologies, Callback<GetEphemeralReadDepsOk> callback)
{
CommandStore commandStore = CommandStore.maybeCurrent();
if (commandStore == null) commandStore = node.commandStores().select(route.homeKey());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
package accord.coordinate;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;

import accord.coordinate.tracking.FastPathTracker;
import accord.local.CommandStore;
Expand Down Expand Up @@ -61,13 +61,13 @@ abstract class CoordinatePreAccept<T> extends AbstractCoordinatePreAccept<T, Pre

CoordinatePreAccept(Node node, TxnId txnId, Txn txn, FullRoute<?> route, Topologies topologies)
{
super(node, route, txnId);
super(node, route, txnId, topologies);
this.tracker = new FastPathTracker(topologies);
this.oks = new ArrayList<>(topologies.estimateUniqueNodes());
this.txn = txn;
}

void contact(Set<Id> nodes, Topologies topologies, Callback<PreAcceptReply> callback)
void contact(Collection<Id> nodes, Topologies topologies, Callback<PreAcceptReply> callback)
{
// TODO (desired, efficiency): consider sending only to electorate of most recent topology (as only these PreAccept votes matter)
// note that we must send to all replicas of old topology, as electorate may not be reachable
Expand Down Expand Up @@ -141,7 +141,7 @@ void onNewEpochTopologyMismatch(TopologyMismatch mismatch)
* We cannot execute the transaction because the execution epoch's topology no longer contains all of the
* participating keys/ranges, so we propose that the transaction is invalidated in its coordination epoch
*/
Propose.Invalidate.proposeInvalidate(node, new Ballot(node.uniqueNow()), txnId, route.someParticipatingKey(), (outcome, failure) -> {
Propose.Invalidate.proposeInvalidate(node, new Ballot(node.uniqueNow()), txnId, route.homeKey(), (outcome, failure) -> {
if (failure != null)
mismatch.addSuppressed(failure);
accept(null, mismatch);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package accord.coordinate;

import accord.coordinate.ExecuteSyncPoint.ExecuteExclusiveSyncPoint;
import accord.coordinate.tracking.AppliedTracker;
import accord.local.Node;
import accord.messages.Callback;
Expand All @@ -28,11 +29,18 @@
import accord.primitives.SyncPoint;
import accord.utils.async.AsyncResult;

public class CoordinateShardDurable extends ExecuteSyncPoint<Ranges> implements Callback<ReadReply>
public class CoordinateShardDurable extends ExecuteExclusiveSyncPoint implements Callback<ReadReply>
{
private CoordinateShardDurable(Node node, SyncPoint<Ranges> exclusiveSyncPoint)
{
super(node, new AppliedTracker(node.topology().forEpoch(exclusiveSyncPoint.keysOrRanges, exclusiveSyncPoint.sourceEpoch())), exclusiveSyncPoint);
super(node, exclusiveSyncPoint, AppliedTracker::new);
addCallback((success, fail) -> {
if (fail == null)
{
node.configService().reportEpochRedundant(syncPoint.keysOrRanges, syncPoint.syncId.epoch());
node.send(tracker.nodes(), new SetShardDurable(syncPoint));
}
});
}

public static AsyncResult<SyncPoint<Ranges>> coordinate(Node node, SyncPoint<Ranges> exclusiveSyncPoint)
Expand All @@ -46,12 +54,4 @@ protected void start()
{
node.send(tracker.nodes(), to -> new WaitUntilApplied(to, tracker.topologies(), syncPoint.syncId, syncPoint.keysOrRanges, syncPoint.syncId.epoch()), this);
}

@Override
protected void onSuccess()
{
node.configService().reportEpochRedundant(syncPoint.keysOrRanges, syncPoint.syncId.epoch());
node.send(tracker.nodes(), new SetShardDurable(syncPoint));
super.onSuccess();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.slf4j.LoggerFactory;

import accord.coordinate.CoordinationAdapter.Adapters;
import accord.coordinate.CoordinationAdapter.Adapters.SyncPointAdapter;
import accord.local.Node;
import accord.messages.Apply;
import accord.messages.PreAccept.PreAcceptOk;
Expand All @@ -38,10 +39,7 @@
import accord.primitives.TxnId;
import accord.topology.Topologies;
import accord.utils.async.AsyncResult;
import accord.utils.async.AsyncResults;

import static accord.coordinate.CoordinationAdapter.Invoke.execute;
import static accord.coordinate.CoordinationAdapter.Invoke.propose;
import static accord.coordinate.ExecutePath.FAST;
import static accord.coordinate.Propose.Invalidate.proposeAndCommitInvalidate;
import static accord.primitives.Timestamp.mergeMax;
Expand All @@ -62,9 +60,9 @@ public class CoordinateSyncPoint<S extends Seekables<?, ?>> extends CoordinatePr

final CoordinationAdapter<SyncPoint<S>> adapter;

private CoordinateSyncPoint(Node node, TxnId txnId, Txn txn, FullRoute<?> route, CoordinationAdapter<SyncPoint<S>> adapter)
private CoordinateSyncPoint(Node node, TxnId txnId, Topologies topologies, Txn txn, FullRoute<?> route, CoordinationAdapter<SyncPoint<S>> adapter)
{
super(node, txnId, txn, route, node.topology().withOpenEpochs(route, txnId, txnId));
super(node, txnId, txn, route, topologies);
this.adapter = adapter;
}

Expand All @@ -88,21 +86,18 @@ private CoordinateSyncPoint(Node node, TxnId txnId, Txn txn, FullRoute<?> route,
return coordinate(node, Kind.SyncPoint, keysOrRanges, Adapters.inclusiveSyncPointBlocking());
}

public static <S extends Seekables<?, ?>> AsyncResult<SyncPoint<S>> coordinate(Node node, Kind kind, S keysOrRanges, CoordinationAdapter<SyncPoint<S>> adapter)
public static <S extends Seekables<?, ?>> AsyncResult<SyncPoint<S>> coordinate(Node node, Kind kind, S keysOrRanges, SyncPointAdapter<S> adapter)
{
checkArgument(kind == Kind.SyncPoint || kind == ExclusiveSyncPoint);
TxnId txnId = node.nextTxnId(kind, keysOrRanges.domain());
return node.withEpoch(txnId.epoch(), () -> coordinate(node, txnId, keysOrRanges, adapter)).beginAsResult();
}

private static <S extends Seekables<?, ?>> AsyncResult<SyncPoint<S>> coordinate(Node node, TxnId txnId, S keysOrRanges, CoordinationAdapter<SyncPoint<S>> adapter)
private static <S extends Seekables<?, ?>> AsyncResult<SyncPoint<S>> coordinate(Node node, TxnId txnId, S keysOrRanges, SyncPointAdapter<S> adapter)
{
checkArgument(txnId.kind() == Kind.SyncPoint || txnId.kind() == ExclusiveSyncPoint);
FullRoute<?> route = node.computeRoute(txnId, keysOrRanges);
TopologyMismatch mismatch = TopologyMismatch.checkForMismatch(node.topology().globalForEpoch(txnId.epoch()), txnId, route.homeKey(), keysOrRanges);
if (mismatch != null)
return AsyncResults.failure(mismatch);
CoordinateSyncPoint<S> coordinate = new CoordinateSyncPoint<>(node, txnId, node.agent().emptyTxn(txnId.kind(), keysOrRanges), route, adapter);
CoordinateSyncPoint<S> coordinate = new CoordinateSyncPoint<>(node, txnId, adapter.forDecision(node, route, txnId), node.agent().emptyTxn(txnId.kind(), keysOrRanges), route, adapter);
coordinate.start();
return coordinate;
}
Expand All @@ -124,13 +119,10 @@ void onPreAccepted(Topologies topologies, Timestamp executeAt, List<PreAcceptOk>
}
else
{
// we don't need to fetch deps from Accept replies, so we don't need to contact unsynced epochs
topologies = node.topology().forEpoch(route, txnId.epoch());
// TODO (required): consider the required semantics of a SyncPoint
if (tracker.hasFastPathAccepted() && txnId.kind() == Kind.SyncPoint)
execute(adapter, node, topologies, route, FAST, txnId, txn, txnId, deps, this);
adapter.execute(node, topologies, route, FAST, txnId, txn, txnId, deps, this);
else
propose(adapter, node, topologies, route, Ballot.ZERO, txnId, txn, executeAt, deps, this);
adapter.propose(node, topologies, route, Ballot.ZERO, txnId, txn, executeAt, deps, this);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@
import accord.utils.async.AsyncResults;

import static accord.coordinate.CoordinationAdapter.Factory.Step.Continue;
import static accord.coordinate.CoordinationAdapter.Invoke.execute;
import static accord.coordinate.CoordinationAdapter.Invoke.propose;
import static accord.coordinate.ExecutePath.FAST;
import static accord.coordinate.Propose.Invalidate.proposeAndCommitInvalidate;

Expand Down Expand Up @@ -73,7 +71,7 @@ void onPreAccepted(Topologies topologies, Timestamp executeAt, List<PreAcceptOk>
if (tracker.hasFastPathAccepted())
{
Deps deps = Deps.merge(oks, ok -> ok.witnessedAt.equals(txnId) ? ok.deps : null);
execute(executeAdapter(), node, topologies, route, FAST, txnId, txn, txnId, deps, settingCallback());
executeAdapter().execute(node, topologies, route, FAST, txnId, txn, txnId, deps, settingCallback());
node.agent().metricsEventsListener().onFastPathTaken(txnId, deps);
}
else
Expand All @@ -93,7 +91,7 @@ void onPreAccepted(Topologies topologies, Timestamp executeAt, List<PreAcceptOk>
if (PreAccept.rejectExecuteAt(txnId, topologies))
proposeAndCommitInvalidate(node, Ballot.ZERO, txnId, route.homeKey(), route, executeAt, this);
else
propose(proposeAdapter(), node, topologies, route, Ballot.ZERO, txnId, txn, executeAt, deps, this);
proposeAdapter().propose(node, topologies, route, Ballot.ZERO, txnId, txn, executeAt, deps, this);
}

node.agent().metricsEventsListener().onSlowPathTaken(txnId, deps);
Expand Down
Loading