Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

C19288 accord async reads are unsafe #98

Open
wants to merge 2 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions accord-core/src/main/java/accord/messages/ReadData.java
Original file line number Diff line number Diff line change
Expand Up @@ -359,11 +359,13 @@ void cancel(@Nullable SafeCommandStore safeStore)
if (safeCommand != null) safeCommand.removeListener(this);
waitingOn.clear(safeStore.commandStore().id());
}

// TODO (expected): efficient unsubscribe mechanism
node.commandStores().mapReduceConsume(this, waitingOn.stream(), forEach(in -> {
SafeCommand safeCommand = in.ifInitialised(txnId);
if (safeCommand != null) safeCommand.removeListener(this);
}, node.agent()));
if (waitingOn != null)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you hit a NPE? since we touch waitingOn.stream() we would hit a NPE.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, if there are multiple calls to cancel it will throw an NPE

node.commandStores().mapReduceConsume(this, waitingOn.stream(), forEach(in -> {
SafeCommand safeCommand = in.ifInitialised(txnId);
if (safeCommand != null) safeCommand.removeListener(this);
}, node.agent()));
state = State.OBSOLETE;
waitingOn = null;
reading = null;
Expand Down
2 changes: 1 addition & 1 deletion accord-core/src/test/java/accord/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public static Txn listWriteTxn(Node.Id client, Keys keys)
for (Key k : keys)
update.put(k, 1);
ListRead read = new ListRead(Function.identity(), false, keys, keys);
ListQuery query = new ListQuery(client, keys.size());
ListQuery query = new ListQuery(client, keys.size(), false);
return new Txn.InMemory(keys, read, query, update);
}

Expand Down
4 changes: 2 additions & 2 deletions accord-core/src/test/java/accord/burn/BurnTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ static List<Packet> generate(RandomSource random, MessageListener listener, Func
requestRanges.add(nextRange.apply(prefixes));
Ranges ranges = Ranges.of(requestRanges.toArray(EMPTY_RANGES));
ListRead read = new ListRead(random.decide(readInCommandStore) ? Function.identity() : executor, false, ranges, ranges);
ListQuery query = new ListQuery(client, finalCount);
ListQuery query = new ListQuery(client, finalCount, false);
return new Txn.InMemory(ranges, read, query);
};
}
Expand Down Expand Up @@ -183,7 +183,7 @@ static List<Packet> generate(RandomSource random, MessageListener listener, Func
if (isWrite)
requestKeys.addAll(update.keySet());
ListRead read = new ListRead(random.decide(readInCommandStore) ? Function.identity() : executor, kind == EphemeralRead, readKeys, new Keys(requestKeys));
ListQuery query = new ListQuery(client, finalCount);
ListQuery query = new ListQuery(client, finalCount, kind == EphemeralRead);
return new Txn.InMemory(kind, new Keys(requestKeys), read, query, update);
};
}
Expand Down
2 changes: 1 addition & 1 deletion accord-core/src/test/java/accord/impl/list/ListAgent.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public long preAcceptTimeout()
@Override
public Txn emptyTxn(Txn.Kind kind, Seekables<?, ?> keysOrRanges)
{
return new Txn.InMemory(kind, keysOrRanges, new ListRead(identity(), false, Keys.EMPTY, Keys.EMPTY), new ListQuery(NONE, Integer.MIN_VALUE), null);
return new Txn.InMemory(kind, keysOrRanges, new ListRead(identity(), false, Keys.EMPTY, Keys.EMPTY), new ListQuery(NONE, Integer.MIN_VALUE, false), null);
}

public boolean collectMaxApplied()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public ListFetchCoordinator(Node node, Ranges ranges, SyncPoint syncPoint, DataS
@Override
protected PartialTxn rangeReadTxn(Ranges ranges)
{
return new PartialTxn.InMemory(ranges, Txn.Kind.Read, ranges, new ListRead(Function.identity(), false, ranges, ranges), new ListQuery(Node.Id.NONE, Long.MIN_VALUE), null);
return new PartialTxn.InMemory(ranges, Txn.Kind.Read, ranges, new ListRead(Function.identity(), false, ranges, ranges), new ListQuery(Node.Id.NONE, Long.MIN_VALUE, false), null);
}

@Override
Expand Down
10 changes: 9 additions & 1 deletion accord-core/src/test/java/accord/impl/list/ListQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,21 @@
import accord.primitives.Seekables;
import accord.primitives.Timestamp;
import accord.primitives.TxnId;
import accord.utils.Invariants;
import accord.utils.Timestamped;
import javax.annotation.Nonnull;

public class ListQuery implements Query
{
final Id client;
final long requestId;
final boolean isEphemeralRead;

public ListQuery(Id client, long requestId)
public ListQuery(Id client, long requestId, boolean isEphemeralRead)
{
this.client = client;
this.requestId = requestId;
this.isEphemeralRead = isEphemeralRead;
}

@Override
Expand All @@ -58,7 +61,12 @@ public Result compute(@Nonnull TxnId txnId, @Nonnull Timestamp executeAt, @Nonnu
{
int i = responseKeys.indexOf(e.getKey());
if (i >= 0)
{
Timestamp timestamp = e.getValue().timestamp;
Invariants.checkState(isEphemeralRead || timestamp.compareTo(executeAt) < 0,
"Data timestamp %s >= execute at %s", timestamp, executeAt);
values[i] = e.getValue().data;
}
}
return new ListResult(ListResult.Status.Applied, client, requestId, txnId, read.userReadKeys, responseKeys, values, (ListUpdate) update);
}
Expand Down
50 changes: 23 additions & 27 deletions accord-core/src/test/java/accord/impl/list/ListRead.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import accord.local.SafeCommandStore;
import accord.primitives.Ranges;
import accord.primitives.Timestamp;
import accord.utils.Invariants;
import accord.utils.async.AsyncChain;
import accord.utils.Timestamped;
import org.slf4j.Logger;
Expand Down Expand Up @@ -67,33 +66,30 @@ public ListRead(Function<? super CommandStore, AsyncExecutor> executor, boolean
@Override
public AsyncChain<Data> read(Seekable key, SafeCommandStore safeStore, Timestamp executeAt, DataStore store)
{
// read synchronously, logically taking a snapshot, so we can impose our invariant of not reading the future
ListStore s = (ListStore)store;
Ranges unavailable = safeStore.ranges().unsafeToReadAt(executeAt);
// TODO (now, correctness): move the read into the executor thread to match real impl
// There is a bug (link jira) where the stale read handle logic no longer detects and fails with the new assert below
// There is a comment early about running synchronously, but this isn't easy for different implementations so should likely
// be an optimization impl take rather than a foundational requirement...
ListData result = new ListData();
switch (key.domain())
{
default: throw new AssertionError();
case Key:
if (!keys.contains((Key)key))
throw new IllegalArgumentException("Attempted to read key " + key + " which is outside of the expected range " + keys);
Timestamped<int[]> data = s.get(unavailable, executeAt, (Key)key);
logger.trace("READ on {} at {} key:{} -> {}", s.node, executeAt, key, data);
Invariants.checkState(isEphemeralRead || data.timestamp.compareTo(executeAt) < 0,
"Data timestamp %s >= execute at %s", data.timestamp, executeAt);
result.put((Key)key, data);
break;
case Range:
if (!keys.containsAll(Ranges.single((Range)key)))
throw new IllegalArgumentException("Attempted to read range " + key + " which is outside of the expected range " + keys);
for (Map.Entry<Key, Timestamped<int[]>> e : s.get(unavailable, executeAt, (Range)key))
result.put(e.getKey(), e.getValue());
}
return executor.apply(safeStore.commandStore()).submit(() -> result);
logger.trace("submitting READ on {} at {} key:{}", s.node, executeAt, key);
return executor.apply(safeStore.commandStore()).submit(() -> {
Ranges unavailable = safeStore.ranges().unsafeToReadAt(executeAt);
ListData result = new ListData();
switch (key.domain())
{
default: throw new AssertionError();
case Key:
if (!keys.contains((Key)key))
throw new IllegalArgumentException("Attempted to read key " + key + " which is outside of the expected range " + keys);
Timestamped<int[]> data = s.get(unavailable, executeAt, (Key)key);
logger.trace("READ on {} at {} key:{} -> {}", s.node, executeAt, key, data);
result.put((Key)key, data);
break;
case Range:
if (!keys.containsAll(Ranges.single((Range)key)))
throw new IllegalArgumentException("Attempted to read range " + key + " which is outside of the expected range " + keys);
for (Map.Entry<Key, Timestamped<int[]>> e : s.get(unavailable, executeAt, (Range)key))
result.put(e.getKey(), e.getValue());
}
return result;
});

}

@Override
Expand Down
1 change: 1 addition & 0 deletions accord-core/src/test/java/accord/impl/list/ListWrite.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public AsyncChain<Void> apply(Seekable key, SafeCommandStore safeStore, Timestam
return Writes.SUCCESS;
TimestampsForKeys.updateLastExecutionTimestamps((AbstractSafeCommandStore<?, ?, ?>) safeStore, (Key) key, executeAt, true);

logger.trace("submitting WRITE on {} at {} key:{}", s.node, executeAt, key);
return executor.apply(safeStore.commandStore()).submit(() -> {
int[] data = get(key);
s.data.merge((Key)key, new Timestamped<>(executeAt, data, Arrays::toString), ListStore::merge);
Expand Down