Skip to content

Commit

Permalink
Backport to branch(3.10) : Fix snapshot management issues (#2024)
Browse files Browse the repository at this point in the history
  • Loading branch information
jnmt authored Jul 2, 2024
1 parent c1896ce commit 1adc6c7
Show file tree
Hide file tree
Showing 7 changed files with 768 additions and 301 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.scalar.db.api.Result;
import com.scalar.db.api.Scan;
import com.scalar.db.api.Scanner;
import com.scalar.db.api.Selection;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.exception.storage.ExecutionException;
import com.scalar.db.exception.transaction.CrudException;
Expand All @@ -19,8 +20,11 @@
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.annotation.concurrent.ThreadSafe;
Expand Down Expand Up @@ -63,30 +67,35 @@ public CrudHandler(
this.mutationConditionsValidator = mutationConditionsValidator;
}

public Optional<Result> get(Get get) throws CrudException {
List<String> originalProjections = new ArrayList<>(get.getProjections());
public Optional<Result> get(Get originalGet) throws CrudException {
List<String> originalProjections = new ArrayList<>(originalGet.getProjections());
Get get = (Get) prepareStorageSelection(originalGet);

Optional<TransactionResult> result;
Snapshot.Key key = new Snapshot.Key(get);

if (snapshot.containsKeyInReadSet(key)) {
return createGetResult(key, originalProjections);
if (snapshot.containsKeyInGetSet(get)) {
return createGetResult(key, get, originalProjections);
}

result = getFromStorage(get);
if (!result.isPresent() || result.get().isCommitted()) {
// Keep the read set latest to create before image by using the latest record (result)
// because another conflicting transaction might have updated the record after this
// transaction read it first.
snapshot.put(key, result);
return createGetResult(key, originalProjections);
snapshot.put(get, result); // for re-read and validation
return createGetResult(key, get, originalProjections);
}
throw new UncommittedRecordException(
result.get(), "this record needs recovery", snapshot.getId());
}

private Optional<Result> createGetResult(Snapshot.Key key, List<String> projections)
private Optional<Result> createGetResult(Snapshot.Key key, Get get, List<String> projections)
throws CrudException {
TableMetadata metadata = getTableMetadata(key.getNamespace(), key.getTable());
return snapshot
.get(key)
.mergeResult(key, snapshot.get(get))
.map(r -> new FilteredResult(r, projections, metadata, isIncludeMetadataEnabled));
}

Expand All @@ -104,20 +113,22 @@ public List<Result> scan(Scan scan) throws CrudException {
return results;
}

private List<Result> scanInternal(Scan scan) throws CrudException {
List<String> originalProjections = new ArrayList<>(scan.getProjections());
private List<Result> scanInternal(Scan originalScan) throws CrudException {
List<String> originalProjections = new ArrayList<>(originalScan.getProjections());
Scan scan = (Scan) prepareStorageSelection(originalScan);

List<Result> results = new ArrayList<>();
Map<Snapshot.Key, TransactionResult> results = new LinkedHashMap<>();

Optional<List<Snapshot.Key>> keysInSnapshot = snapshot.get(scan);
if (keysInSnapshot.isPresent()) {
for (Snapshot.Key key : keysInSnapshot.get()) {
snapshot.get(key).ifPresent(results::add);
Optional<Map<Snapshot.Key, TransactionResult>> resultsInSnapshot = snapshot.get(scan);
if (resultsInSnapshot.isPresent()) {
for (Entry<Snapshot.Key, TransactionResult> entry : resultsInSnapshot.get().entrySet()) {
snapshot
.mergeResult(entry.getKey(), Optional.of(entry.getValue()))
.ifPresent(result -> results.put(entry.getKey(), result));
}
return createScanResults(scan, originalProjections, results);
}

List<Snapshot.Key> keys = new ArrayList<>();
Scanner scanner = null;
try {
scanner = getFromStorage(scan);
Expand All @@ -130,12 +141,12 @@ private List<Result> scanInternal(Scan scan) throws CrudException {

Snapshot.Key key = new Snapshot.Key(scan, r);

if (!snapshot.containsKeyInReadSet(key)) {
snapshot.put(key, Optional.of(result));
}
// We always update the read set to create before image by using the latest record (result)
// because another conflicting transaction might have updated the record after this
// transaction read it first.
snapshot.put(key, Optional.of(result));

keys.add(key);
snapshot.get(key).ifPresent(results::add);
snapshot.mergeResult(key, Optional.of(result)).ifPresent(value -> results.put(key, value));
}
} finally {
if (scanner != null) {
Expand All @@ -146,15 +157,16 @@ private List<Result> scanInternal(Scan scan) throws CrudException {
}
}
}
snapshot.put(scan, keys);
snapshot.put(scan, results);

return createScanResults(scan, originalProjections, results);
}

private List<Result> createScanResults(Scan scan, List<String> projections, List<Result> results)
private List<Result> createScanResults(
Scan scan, List<String> projections, Map<Snapshot.Key, TransactionResult> results)
throws CrudException {
TableMetadata metadata = getTableMetadata(scan.forNamespace().get(), scan.forTable().get());
return results.stream()
return results.values().stream()
.map(r -> new FilteredResult(r, projections, metadata, isIncludeMetadataEnabled))
.collect(Collectors.toList());
}
Expand All @@ -171,37 +183,38 @@ public void delete(Delete delete) throws UnsatisfiedConditionException {
snapshot.put(new Snapshot.Key(delete), delete);
}

private Optional<TransactionResult> getFromStorage(Get get) throws CrudException {
@VisibleForTesting
Optional<TransactionResult> getFromStorage(Get get) throws CrudException {
try {
get.clearProjections();
// Retrieve only the after images columns when including the metadata is disabled, otherwise
// retrieve all the columns
if (!isIncludeMetadataEnabled) {
LinkedHashSet<String> afterImageColumnNames =
tableMetadataManager.getTransactionTableMetadata(get).getAfterImageColumnNames();
get.withProjections(afterImageColumnNames);
}
get.withConsistency(Consistency.LINEARIZABLE);
return storage.get(get).map(TransactionResult::new);
} catch (ExecutionException e) {
throw new CrudException("get failed", e, snapshot.getId());
}
}

private Scanner getFromStorage(Scan scan) throws CrudException {
@VisibleForTesting
Scanner getFromStorage(Scan scan) throws CrudException {
try {
return storage.scan(scan);
} catch (ExecutionException e) {
throw new CrudException("scan failed", e, snapshot.getId());
}
}

private Selection prepareStorageSelection(Selection selection) throws CrudException {
try {
scan.clearProjections();
selection.clearProjections();
// Retrieve only the after images columns when including the metadata is disabled, otherwise
// retrieve all the columns
if (!isIncludeMetadataEnabled) {
LinkedHashSet<String> afterImageColumnNames =
tableMetadataManager.getTransactionTableMetadata(scan).getAfterImageColumnNames();
scan.withProjections(afterImageColumnNames);
tableMetadataManager.getTransactionTableMetadata(selection).getAfterImageColumnNames();
selection.withProjections(afterImageColumnNames);
}
scan.withConsistency(Consistency.LINEARIZABLE);
return storage.scan(scan);
selection.withConsistency(Consistency.LINEARIZABLE);
return selection;
} catch (ExecutionException e) {
throw new CrudException("scan failed", e, snapshot.getId());
throw new CrudException("getting a table metadata failed", e, snapshot.getId());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ public class Snapshot {
private final TransactionTableMetadataManager tableMetadataManager;
private final ParallelExecutor parallelExecutor;
private final Map<Key, Optional<TransactionResult>> readSet;
private final Map<Scan, List<Key>> scanSet;
private final Map<Get, Optional<TransactionResult>> getSet;
private final Map<Scan, Map<Key, TransactionResult>> scanSet;
private final Map<Key, Put> writeSet;
private final Map<Key, Delete> deleteSet;

Expand All @@ -65,6 +66,7 @@ public Snapshot(
this.tableMetadataManager = tableMetadataManager;
this.parallelExecutor = parallelExecutor;
readSet = new HashMap<>();
getSet = new HashMap<>();
scanSet = new HashMap<>();
writeSet = new HashMap<>();
deleteSet = new HashMap<>();
Expand All @@ -78,7 +80,8 @@ public Snapshot(
TransactionTableMetadataManager tableMetadataManager,
ParallelExecutor parallelExecutor,
Map<Key, Optional<TransactionResult>> readSet,
Map<Scan, List<Key>> scanSet,
Map<Get, Optional<TransactionResult>> getSet,
Map<Scan, Map<Key, TransactionResult>> scanSet,
Map<Key, Put> writeSet,
Map<Key, Delete> deleteSet) {
this.id = id;
Expand All @@ -87,6 +90,7 @@ public Snapshot(
this.tableMetadataManager = tableMetadataManager;
this.parallelExecutor = parallelExecutor;
this.readSet = readSet;
this.getSet = getSet;
this.scanSet = scanSet;
this.writeSet = writeSet;
this.deleteSet = deleteSet;
Expand All @@ -107,8 +111,12 @@ public void put(Key key, Optional<TransactionResult> result) {
readSet.put(key, result);
}

public void put(Scan scan, List<Key> keys) {
scanSet.put(scan, keys);
public void put(Get get, Optional<TransactionResult> result) {
getSet.put(get, result);
}

public void put(Scan scan, Map<Key, TransactionResult> results) {
scanSet.put(scan, results);
}

public void put(Key key, Put put) {
Expand Down Expand Up @@ -137,21 +145,18 @@ public Optional<TransactionResult> getFromReadSet(Key key) {
return readSet.containsKey(key) ? readSet.get(key) : Optional.empty();
}

public Optional<TransactionResult> get(Key key) throws CrudException {
public Optional<TransactionResult> mergeResult(Key key, Optional<TransactionResult> result)
throws CrudException {
if (deleteSet.containsKey(key)) {
return Optional.empty();
} else if (readSet.containsKey(key)) {
if (writeSet.containsKey(key)) {
// merge the result in the read set and the put in the write set
return Optional.of(
new TransactionResult(
new MergedResult(readSet.get(key), writeSet.get(key), getTableMetadata(key))));
} else {
return readSet.get(key);
}
} else if (writeSet.containsKey(key)) {
// merge the result in the read set and the put in the write set
return Optional.of(
new TransactionResult(
new MergedResult(result, writeSet.get(key), getTableMetadata(key))));
} else {
return result;
}
throw new IllegalArgumentException(
"getting data neither in the read set nor the delete set is not allowed");
}

private TableMetadata getTableMetadata(Key key) throws CrudException {
Expand Down Expand Up @@ -185,7 +190,17 @@ private TableMetadata getTableMetadata(Scan scan) throws ExecutionException {
}
}

public Optional<List<Key>> get(Scan scan) {
public boolean containsKeyInGetSet(Get get) {
return getSet.containsKey(get);
}

public Optional<TransactionResult> get(Get get) {
// We expect this method is called after putting the result of the get operation in the get set.
assert getSet.containsKey(get);
return getSet.get(get);
}

public Optional<Map<Key, TransactionResult>> get(Scan scan) {
if (scanSet.containsKey(scan)) {
return Optional.ofNullable(scanSet.get(scan));
}
Expand Down Expand Up @@ -222,6 +237,10 @@ private boolean isWriteSetOverlappedWith(Scan scan) {
}

for (Map.Entry<Key, Put> entry : writeSet.entrySet()) {
if (scanSet.get(scan).containsKey(entry.getKey())) {
return true;
}

Put put = entry.getValue();

if (!put.forNamespace().equals(scan.forNamespace())
Expand Down Expand Up @@ -278,7 +297,7 @@ private boolean isWriteSetOverlappedWith(Scan scan) {

private boolean isWriteSetOverlappedWith(ScanWithIndex scan) {
for (Map.Entry<Key, Put> entry : writeSet.entrySet()) {
if (scanSet.get(scan).contains(entry.getKey())) {
if (scanSet.get(scan).containsKey(entry.getKey())) {
return true;
}

Expand Down Expand Up @@ -315,7 +334,7 @@ private boolean isWriteSetOverlappedWith(ScanAll scan) {
// yet. Thus, we need to evaluate if the scan condition potentially matches put operations.

// Check for cases 1 and 2
if (scanSet.get(scan).contains(entry.getKey())) {
if (scanSet.get(scan).containsKey(entry.getKey())) {
return true;
}

Expand Down Expand Up @@ -433,14 +452,14 @@ void toSerializableWithExtraRead(DistributedStorage storage)
List<ParallelExecutorTask> tasks = new ArrayList<>();

// Read set by scan is re-validated to check if there is no anti-dependency
for (Map.Entry<Scan, List<Key>> entry : scanSet.entrySet()) {
for (Map.Entry<Scan, Map<Key, TransactionResult>> entry : scanSet.entrySet()) {
tasks.add(
() -> {
Map<Key, TransactionResult> currentReadMap = new HashMap<>();
Set<Key> validatedReadSet = new HashSet<>();
Scanner scanner = null;
Scan scan = entry.getKey();
try {
Scan scan = entry.getKey();
// only get tx_id and tx_version columns because we use only them to compare
scan.clearProjections();
scan.withProjection(Attribute.ID).withProjection(Attribute.VERSION);
Expand All @@ -464,13 +483,15 @@ void toSerializableWithExtraRead(DistributedStorage storage)
}
}

for (Key key : entry.getValue()) {
for (Map.Entry<Key, TransactionResult> e : entry.getValue().entrySet()) {
Key key = e.getKey();
TransactionResult result = e.getValue();
if (writeSet.containsKey(key) || deleteSet.containsKey(key)) {
continue;
}
// Check if read records are not changed
TransactionResult latestResult = currentReadMap.get(key);
if (isChanged(Optional.ofNullable(latestResult), readSet.get(key))) {
if (isChanged(Optional.ofNullable(latestResult), Optional.of(result))) {
throwExceptionDueToAntiDependency();
}
validatedReadSet.add(key);
Expand All @@ -483,35 +504,23 @@ void toSerializableWithExtraRead(DistributedStorage storage)
});
}

// Calculate read set validated by scan
Set<Key> validatedReadSetByScan = new HashSet<>();
for (List<Key> values : scanSet.values()) {
validatedReadSetByScan.addAll(values);
}

// Read set by get is re-validated to check if there is no anti-dependency
for (Map.Entry<Key, Optional<TransactionResult>> entry : readSet.entrySet()) {
Key key = entry.getKey();
if (writeSet.containsKey(key)
|| deleteSet.containsKey(key)
|| validatedReadSetByScan.contains(key)) {
for (Map.Entry<Get, Optional<TransactionResult>> entry : getSet.entrySet()) {
Get get = entry.getKey();
Key key = new Key(get);
if (writeSet.containsKey(key) || deleteSet.containsKey(key)) {
continue;
}

tasks.add(
() -> {
Optional<TransactionResult> originalResult = getSet.get(get);
// only get tx_id and tx_version columns because we use only them to compare
Get get =
new Get(key.getPartitionKey(), key.getClusteringKey().orElse(null))
.withProjection(Attribute.ID)
.withProjection(Attribute.VERSION)
.withConsistency(Consistency.LINEARIZABLE)
.forNamespace(key.getNamespace())
.forTable(key.getTable());

get.clearProjections();
get.withProjection(Attribute.ID).withProjection(Attribute.VERSION);
Optional<TransactionResult> latestResult = storage.get(get).map(TransactionResult::new);
// Check if a read record is not changed
if (isChanged(latestResult, entry.getValue())) {
if (isChanged(latestResult, originalResult)) {
throwExceptionDueToAntiDependency();
}
});
Expand Down
Loading

0 comments on commit 1adc6c7

Please sign in to comment.