diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java index 3a6839aa0a..d59553c3d6 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java @@ -11,6 +11,7 @@ import com.scalar.db.api.Result; import com.scalar.db.api.Scan; import com.scalar.db.api.Scanner; +import com.scalar.db.api.Selection; import com.scalar.db.api.TableMetadata; import com.scalar.db.exception.storage.ExecutionException; import com.scalar.db.exception.transaction.CrudException; @@ -19,8 +20,11 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.IOException; import java.util.ArrayList; +import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; +import java.util.Map; +import java.util.Map.Entry; import java.util.Optional; import java.util.stream.Collectors; import javax.annotation.concurrent.ThreadSafe; @@ -63,30 +67,35 @@ public CrudHandler( this.mutationConditionsValidator = mutationConditionsValidator; } - public Optional get(Get get) throws CrudException { - List originalProjections = new ArrayList<>(get.getProjections()); + public Optional get(Get originalGet) throws CrudException { + List originalProjections = new ArrayList<>(originalGet.getProjections()); + Get get = (Get) prepareStorageSelection(originalGet); Optional result; Snapshot.Key key = new Snapshot.Key(get); - if (snapshot.containsKeyInReadSet(key)) { - return createGetResult(key, originalProjections); + if (snapshot.containsKeyInGetSet(get)) { + return createGetResult(key, get, originalProjections); } result = getFromStorage(get); if (!result.isPresent() || result.get().isCommitted()) { + // Keep the read set latest to create before image by using the latest record (result) + // because another conflicting transaction might have updated the record after this + // transaction read it first. snapshot.put(key, result); - return createGetResult(key, originalProjections); + snapshot.put(get, result); // for re-read and validation + return createGetResult(key, get, originalProjections); } throw new UncommittedRecordException( result.get(), "this record needs recovery", snapshot.getId()); } - private Optional createGetResult(Snapshot.Key key, List projections) + private Optional createGetResult(Snapshot.Key key, Get get, List projections) throws CrudException { TableMetadata metadata = getTableMetadata(key.getNamespace(), key.getTable()); return snapshot - .get(key) + .mergeResult(key, snapshot.get(get)) .map(r -> new FilteredResult(r, projections, metadata, isIncludeMetadataEnabled)); } @@ -104,20 +113,22 @@ public List scan(Scan scan) throws CrudException { return results; } - private List scanInternal(Scan scan) throws CrudException { - List originalProjections = new ArrayList<>(scan.getProjections()); + private List scanInternal(Scan originalScan) throws CrudException { + List originalProjections = new ArrayList<>(originalScan.getProjections()); + Scan scan = (Scan) prepareStorageSelection(originalScan); - List results = new ArrayList<>(); + Map results = new LinkedHashMap<>(); - Optional> keysInSnapshot = snapshot.get(scan); - if (keysInSnapshot.isPresent()) { - for (Snapshot.Key key : keysInSnapshot.get()) { - snapshot.get(key).ifPresent(results::add); + Optional> resultsInSnapshot = snapshot.get(scan); + if (resultsInSnapshot.isPresent()) { + for (Entry entry : resultsInSnapshot.get().entrySet()) { + snapshot + .mergeResult(entry.getKey(), Optional.of(entry.getValue())) + .ifPresent(result -> results.put(entry.getKey(), result)); } return createScanResults(scan, originalProjections, results); } - List keys = new ArrayList<>(); Scanner scanner = null; try { scanner = getFromStorage(scan); @@ -130,12 +141,12 @@ private List scanInternal(Scan scan) throws CrudException { Snapshot.Key key = new Snapshot.Key(scan, r); - if (!snapshot.containsKeyInReadSet(key)) { - snapshot.put(key, Optional.of(result)); - } + // We always update the read set to create before image by using the latest record (result) + // because another conflicting transaction might have updated the record after this + // transaction read it first. + snapshot.put(key, Optional.of(result)); - keys.add(key); - snapshot.get(key).ifPresent(results::add); + snapshot.mergeResult(key, Optional.of(result)).ifPresent(value -> results.put(key, value)); } } finally { if (scanner != null) { @@ -146,15 +157,16 @@ private List scanInternal(Scan scan) throws CrudException { } } } - snapshot.put(scan, keys); + snapshot.put(scan, results); return createScanResults(scan, originalProjections, results); } - private List createScanResults(Scan scan, List projections, List results) + private List createScanResults( + Scan scan, List projections, Map results) throws CrudException { TableMetadata metadata = getTableMetadata(scan.forNamespace().get(), scan.forTable().get()); - return results.stream() + return results.values().stream() .map(r -> new FilteredResult(r, projections, metadata, isIncludeMetadataEnabled)) .collect(Collectors.toList()); } @@ -171,37 +183,38 @@ public void delete(Delete delete) throws UnsatisfiedConditionException { snapshot.put(new Snapshot.Key(delete), delete); } - private Optional getFromStorage(Get get) throws CrudException { + @VisibleForTesting + Optional getFromStorage(Get get) throws CrudException { try { - get.clearProjections(); - // Retrieve only the after images columns when including the metadata is disabled, otherwise - // retrieve all the columns - if (!isIncludeMetadataEnabled) { - LinkedHashSet afterImageColumnNames = - tableMetadataManager.getTransactionTableMetadata(get).getAfterImageColumnNames(); - get.withProjections(afterImageColumnNames); - } - get.withConsistency(Consistency.LINEARIZABLE); return storage.get(get).map(TransactionResult::new); } catch (ExecutionException e) { throw new CrudException("get failed", e, snapshot.getId()); } } - private Scanner getFromStorage(Scan scan) throws CrudException { + @VisibleForTesting + Scanner getFromStorage(Scan scan) throws CrudException { + try { + return storage.scan(scan); + } catch (ExecutionException e) { + throw new CrudException("scan failed", e, snapshot.getId()); + } + } + + private Selection prepareStorageSelection(Selection selection) throws CrudException { try { - scan.clearProjections(); + selection.clearProjections(); // Retrieve only the after images columns when including the metadata is disabled, otherwise // retrieve all the columns if (!isIncludeMetadataEnabled) { LinkedHashSet afterImageColumnNames = - tableMetadataManager.getTransactionTableMetadata(scan).getAfterImageColumnNames(); - scan.withProjections(afterImageColumnNames); + tableMetadataManager.getTransactionTableMetadata(selection).getAfterImageColumnNames(); + selection.withProjections(afterImageColumnNames); } - scan.withConsistency(Consistency.LINEARIZABLE); - return storage.scan(scan); + selection.withConsistency(Consistency.LINEARIZABLE); + return selection; } catch (ExecutionException e) { - throw new CrudException("scan failed", e, snapshot.getId()); + throw new CrudException("getting a table metadata failed", e, snapshot.getId()); } } diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java index 5338e8d846..93a4c13587 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java @@ -49,7 +49,8 @@ public class Snapshot { private final TransactionTableMetadataManager tableMetadataManager; private final ParallelExecutor parallelExecutor; private final Map> readSet; - private final Map> scanSet; + private final Map> getSet; + private final Map> scanSet; private final Map writeSet; private final Map deleteSet; @@ -65,6 +66,7 @@ public Snapshot( this.tableMetadataManager = tableMetadataManager; this.parallelExecutor = parallelExecutor; readSet = new HashMap<>(); + getSet = new HashMap<>(); scanSet = new HashMap<>(); writeSet = new HashMap<>(); deleteSet = new HashMap<>(); @@ -78,7 +80,8 @@ public Snapshot( TransactionTableMetadataManager tableMetadataManager, ParallelExecutor parallelExecutor, Map> readSet, - Map> scanSet, + Map> getSet, + Map> scanSet, Map writeSet, Map deleteSet) { this.id = id; @@ -87,6 +90,7 @@ public Snapshot( this.tableMetadataManager = tableMetadataManager; this.parallelExecutor = parallelExecutor; this.readSet = readSet; + this.getSet = getSet; this.scanSet = scanSet; this.writeSet = writeSet; this.deleteSet = deleteSet; @@ -107,8 +111,12 @@ public void put(Key key, Optional result) { readSet.put(key, result); } - public void put(Scan scan, List keys) { - scanSet.put(scan, keys); + public void put(Get get, Optional result) { + getSet.put(get, result); + } + + public void put(Scan scan, Map results) { + scanSet.put(scan, results); } public void put(Key key, Put put) { @@ -137,21 +145,18 @@ public Optional getFromReadSet(Key key) { return readSet.containsKey(key) ? readSet.get(key) : Optional.empty(); } - public Optional get(Key key) throws CrudException { + public Optional mergeResult(Key key, Optional result) + throws CrudException { if (deleteSet.containsKey(key)) { return Optional.empty(); - } else if (readSet.containsKey(key)) { - if (writeSet.containsKey(key)) { - // merge the result in the read set and the put in the write set - return Optional.of( - new TransactionResult( - new MergedResult(readSet.get(key), writeSet.get(key), getTableMetadata(key)))); - } else { - return readSet.get(key); - } + } else if (writeSet.containsKey(key)) { + // merge the result in the read set and the put in the write set + return Optional.of( + new TransactionResult( + new MergedResult(result, writeSet.get(key), getTableMetadata(key)))); + } else { + return result; } - throw new IllegalArgumentException( - "getting data neither in the read set nor the delete set is not allowed"); } private TableMetadata getTableMetadata(Key key) throws CrudException { @@ -185,7 +190,17 @@ private TableMetadata getTableMetadata(Scan scan) throws ExecutionException { } } - public Optional> get(Scan scan) { + public boolean containsKeyInGetSet(Get get) { + return getSet.containsKey(get); + } + + public Optional get(Get get) { + // We expect this method is called after putting the result of the get operation in the get set. + assert getSet.containsKey(get); + return getSet.get(get); + } + + public Optional> get(Scan scan) { if (scanSet.containsKey(scan)) { return Optional.ofNullable(scanSet.get(scan)); } @@ -222,6 +237,10 @@ private boolean isWriteSetOverlappedWith(Scan scan) { } for (Map.Entry entry : writeSet.entrySet()) { + if (scanSet.get(scan).containsKey(entry.getKey())) { + return true; + } + Put put = entry.getValue(); if (!put.forNamespace().equals(scan.forNamespace()) @@ -278,7 +297,7 @@ private boolean isWriteSetOverlappedWith(Scan scan) { private boolean isWriteSetOverlappedWith(ScanWithIndex scan) { for (Map.Entry entry : writeSet.entrySet()) { - if (scanSet.get(scan).contains(entry.getKey())) { + if (scanSet.get(scan).containsKey(entry.getKey())) { return true; } @@ -315,7 +334,7 @@ private boolean isWriteSetOverlappedWith(ScanAll scan) { // yet. Thus, we need to evaluate if the scan condition potentially matches put operations. // Check for cases 1 and 2 - if (scanSet.get(scan).contains(entry.getKey())) { + if (scanSet.get(scan).containsKey(entry.getKey())) { return true; } @@ -433,14 +452,14 @@ void toSerializableWithExtraRead(DistributedStorage storage) List tasks = new ArrayList<>(); // Read set by scan is re-validated to check if there is no anti-dependency - for (Map.Entry> entry : scanSet.entrySet()) { + for (Map.Entry> entry : scanSet.entrySet()) { tasks.add( () -> { Map currentReadMap = new HashMap<>(); Set validatedReadSet = new HashSet<>(); Scanner scanner = null; + Scan scan = entry.getKey(); try { - Scan scan = entry.getKey(); // only get tx_id and tx_version columns because we use only them to compare scan.clearProjections(); scan.withProjection(Attribute.ID).withProjection(Attribute.VERSION); @@ -464,13 +483,15 @@ void toSerializableWithExtraRead(DistributedStorage storage) } } - for (Key key : entry.getValue()) { + for (Map.Entry e : entry.getValue().entrySet()) { + Key key = e.getKey(); + TransactionResult result = e.getValue(); if (writeSet.containsKey(key) || deleteSet.containsKey(key)) { continue; } // Check if read records are not changed TransactionResult latestResult = currentReadMap.get(key); - if (isChanged(Optional.ofNullable(latestResult), readSet.get(key))) { + if (isChanged(Optional.ofNullable(latestResult), Optional.of(result))) { throwExceptionDueToAntiDependency(); } validatedReadSet.add(key); @@ -483,35 +504,23 @@ void toSerializableWithExtraRead(DistributedStorage storage) }); } - // Calculate read set validated by scan - Set validatedReadSetByScan = new HashSet<>(); - for (List values : scanSet.values()) { - validatedReadSetByScan.addAll(values); - } - // Read set by get is re-validated to check if there is no anti-dependency - for (Map.Entry> entry : readSet.entrySet()) { - Key key = entry.getKey(); - if (writeSet.containsKey(key) - || deleteSet.containsKey(key) - || validatedReadSetByScan.contains(key)) { + for (Map.Entry> entry : getSet.entrySet()) { + Get get = entry.getKey(); + Key key = new Key(get); + if (writeSet.containsKey(key) || deleteSet.containsKey(key)) { continue; } tasks.add( () -> { + Optional originalResult = getSet.get(get); // only get tx_id and tx_version columns because we use only them to compare - Get get = - new Get(key.getPartitionKey(), key.getClusteringKey().orElse(null)) - .withProjection(Attribute.ID) - .withProjection(Attribute.VERSION) - .withConsistency(Consistency.LINEARIZABLE) - .forNamespace(key.getNamespace()) - .forTable(key.getTable()); - + get.clearProjections(); + get.withProjection(Attribute.ID).withProjection(Attribute.VERSION); Optional latestResult = storage.get(get).map(TransactionResult::new); // Check if a read record is not changed - if (isChanged(latestResult, entry.getValue())) { + if (isChanged(latestResult, originalResult)) { throwExceptionDueToAntiDependency(); } }); diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/CrudHandlerTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/CrudHandlerTest.java index 7fcc574f70..914b76bef1 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/CrudHandlerTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/CrudHandlerTest.java @@ -11,6 +11,7 @@ import com.google.common.collect.ImmutableMap; import com.scalar.db.api.ConditionBuilder; +import com.scalar.db.api.Consistency; import com.scalar.db.api.Delete; import com.scalar.db.api.DistributedStorage; import com.scalar.db.api.Get; @@ -62,6 +63,8 @@ public class CrudHandlerTest { .addPartitionKey(ANY_NAME_1) .addClusteringKey(ANY_NAME_2) .build()); + private static final TransactionTableMetadata TRANSACTION_TABLE_METADATA = + new TransactionTableMetadata(TABLE_METADATA); private CrudHandler handler; @Mock private DistributedStorage storage; @@ -94,6 +97,14 @@ private Get prepareGet() { .forTable(ANY_TABLE_NAME); } + private Get toGetForStorageFrom(Get get) { + return Get.newBuilder(get) + .clearProjections() + .projections(TRANSACTION_TABLE_METADATA.getAfterImageColumnNames()) + .consistency(Consistency.LINEARIZABLE) + .build(); + } + private Scan prepareScan() { Key partitionKey = new Key(ANY_NAME_1, ANY_TEXT_1); return new Scan(partitionKey).forNamespace(ANY_NAMESPACE_NAME).forTable(ANY_TABLE_NAME); @@ -108,6 +119,14 @@ private Scan prepareRelationalScan() { .build(); } + private Scan toScanForStorageFrom(Scan scan) { + return Scan.newBuilder(scan) + .clearProjections() + .projections(TRANSACTION_TABLE_METADATA.getAfterImageColumnNames()) + .consistency(Consistency.LINEARIZABLE) + .build(); + } + private TransactionResult prepareResult(TransactionState state) { ImmutableMap> columns = ImmutableMap.>builder() @@ -127,12 +146,14 @@ private TransactionResult prepareResult(TransactionState state) { } @Test - public void get_KeyExistsInSnapshot_ShouldReturnFromSnapshot() throws CrudException { + public void get_GetExistsInSnapshot_ShouldReturnFromSnapshot() throws CrudException { // Arrange Get get = prepareGet(); + Get getForStorage = toGetForStorageFrom(get); Optional expected = Optional.of(prepareResult(TransactionState.COMMITTED)); - when(snapshot.containsKeyInReadSet(new Snapshot.Key(get))).thenReturn(true); - when(snapshot.get(new Snapshot.Key(get))).thenReturn(expected); + when(snapshot.containsKeyInGetSet(getForStorage)).thenReturn(true); + when(snapshot.get(getForStorage)).thenReturn(expected); + when(snapshot.mergeResult(new Snapshot.Key(getForStorage), expected)).thenReturn(expected); // Act Optional actual = handler.get(get); @@ -147,18 +168,21 @@ public void get_KeyExistsInSnapshot_ShouldReturnFromSnapshot() throws CrudExcept @Test public void - get_KeyNotExistsInSnapshotAndRecordInStorageCommitted_ShouldReturnFromStorageAndUpdateSnapshot() + get_GetNotExistsInSnapshotAndRecordInStorageCommitted_ShouldReturnFromStorageAndUpdateSnapshot() throws CrudException, ExecutionException { // Arrange Get get = prepareGet(); + Get getForStorage = toGetForStorageFrom(get); Optional expected = Optional.of(prepareResult(TransactionState.COMMITTED)); - Snapshot.Key key = new Snapshot.Key(get); - when(snapshot.containsKeyInReadSet(key)).thenReturn(false); + Optional transactionResult = expected.map(e -> (TransactionResult) e); + Snapshot.Key key = new Snapshot.Key(getForStorage); + when(snapshot.containsKeyInGetSet(getForStorage)).thenReturn(false); doNothing() .when(snapshot) .put(any(Snapshot.Key.class), ArgumentMatchers.>any()); - when(storage.get(get)).thenReturn(expected); - when(snapshot.get(key)).thenReturn(expected.map(e -> (TransactionResult) e)); + when(storage.get(getForStorage)).thenReturn(expected); + when(snapshot.get(getForStorage)).thenReturn(transactionResult); + when(snapshot.mergeResult(key, transactionResult)).thenReturn(transactionResult); // Act Optional result = handler.get(get); @@ -169,31 +193,35 @@ public void get_KeyExistsInSnapshot_ShouldReturnFromSnapshot() throws CrudExcept Optional.of( new FilteredResult( expected.get(), Collections.emptyList(), TABLE_METADATA, false))); - verify(storage).get(get); + verify(storage).get(getForStorage); verify(snapshot).put(key, Optional.of((TransactionResult) expected.get())); + verify(snapshot).put(get, Optional.of((TransactionResult) expected.get())); } @Test public void - get_KeyNotExistsInSnapshotAndRecordInStorageNotCommitted_ShouldThrowUncommittedRecordException() - throws CrudException, ExecutionException { + get_GetNotExistsInSnapshotAndRecordInStorageNotCommitted_ShouldThrowUncommittedRecordException() + throws ExecutionException { // Arrange Get get = prepareGet(); - Optional expected = Optional.of(prepareResult(TransactionState.PREPARED)); - when(snapshot.get(new Snapshot.Key(get))).thenReturn(Optional.empty()); - when(storage.get(get)).thenReturn(expected); + Get getForStorage = toGetForStorageFrom(get); + result = prepareResult(TransactionState.PREPARED); + Optional expected = Optional.of(result); + when(snapshot.containsKeyInGetSet(getForStorage)).thenReturn(false); + when(storage.get(getForStorage)).thenReturn(expected); // Act Assert assertThatThrownBy(() -> handler.get(get)).isInstanceOf(UncommittedRecordException.class); } @Test - public void get_KeyNeitherExistsInSnapshotNorInStorage_ShouldReturnEmpty() + public void get_GetNotExistsInSnapshotAndRecordNotExistsInStorage_ShouldReturnEmpty() throws CrudException, ExecutionException { // Arrange Get get = prepareGet(); - when(snapshot.get(new Snapshot.Key(get))).thenReturn(Optional.empty()); - when(storage.get(get)).thenReturn(Optional.empty()); + Get getForStorage = toGetForStorageFrom(get); + when(snapshot.containsKeyInGetSet(getForStorage)).thenReturn(false); + when(storage.get(getForStorage)).thenReturn(Optional.empty()); // Act Optional result = handler.get(get); @@ -204,10 +232,10 @@ public void get_KeyNeitherExistsInSnapshotNorInStorage_ShouldReturnEmpty() @Test public void get_KeyNotExistsInCrudSetAndExceptionThrownInStorage_ShouldThrowCrudException() - throws CrudException, ExecutionException { + throws ExecutionException { // Arrange Get get = prepareGet(); - when(snapshot.get(new Snapshot.Key(get))).thenReturn(Optional.empty()); + when(snapshot.get(get)).thenReturn(Optional.empty()); ExecutionException toThrow = mock(ExecutionException.class); when(storage.get(get)).thenThrow(toThrow); @@ -215,25 +243,107 @@ public void get_KeyNotExistsInCrudSetAndExceptionThrownInStorage_ShouldThrowCrud assertThatThrownBy(() -> handler.get(get)).isInstanceOf(CrudException.class).hasCause(toThrow); } + @Test + public void get_GetNotExistsInSnapshotAndExceptionThrownInStorage_ShouldThrowCrudException() + throws ExecutionException { + // Arrange + Get get = prepareGet(); + Get getForStorage = toGetForStorageFrom(get); + when(snapshot.containsKeyInGetSet(getForStorage)).thenReturn(false); + ExecutionException toThrow = mock(ExecutionException.class); + when(storage.get(getForStorage)).thenThrow(toThrow); + + // Act Assert + assertThatThrownBy(() -> handler.get(get)).isInstanceOf(CrudException.class).hasCause(toThrow); + } + + @Test + public void get_CalledTwice_SecondTimeShouldReturnTheSameFromSnapshot() + throws ExecutionException, CrudException { + // Arrange + Get originalGet = prepareGet(); + Get getForStorage = toGetForStorageFrom(originalGet); + Get get1 = prepareGet(); + Get get2 = prepareGet(); + Result result = prepareResult(TransactionState.COMMITTED); + Optional expected = Optional.of(new TransactionResult(result)); + doNothing() + .when(snapshot) + .put(any(Snapshot.Key.class), ArgumentMatchers.>any()); + Snapshot.Key key = new Snapshot.Key(getForStorage); + when(snapshot.containsKeyInGetSet(getForStorage)).thenReturn(false).thenReturn(true); + when(snapshot.get(getForStorage)).thenReturn(expected).thenReturn(expected); + when(snapshot.mergeResult(key, expected)).thenReturn(expected).thenReturn(expected); + when(storage.get(getForStorage)).thenReturn(Optional.of(result)); + + // Act + Optional results1 = handler.get(get1); + Optional results2 = handler.get(get2); + + // Assert + verify(snapshot).put(key, expected); + assertThat(results1) + .isEqualTo( + Optional.of( + new FilteredResult( + expected.get(), Collections.emptyList(), TABLE_METADATA, false))); + assertThat(results1).isEqualTo(results2); + verify(storage, never()).get(originalGet); + verify(storage).get(getForStorage); + } + + @Test + public void get_CalledTwiceUnderRealSnapshot_SecondTimeShouldReturnTheSameFromSnapshot() + throws ExecutionException, CrudException { + // Arrange + Get originalGet = prepareGet(); + Get getForStorage = toGetForStorageFrom(originalGet); + Get get1 = prepareGet(); + Get get2 = prepareGet(); + Result result = prepareResult(TransactionState.COMMITTED); + Optional expected = Optional.of(new TransactionResult(result)); + snapshot = + new Snapshot(ANY_TX_ID, Isolation.SNAPSHOT, null, tableMetadataManager, parallelExecutor); + handler = + new CrudHandler( + storage, snapshot, tableMetadataManager, false, mutationConditionsValidator); + when(storage.get(getForStorage)).thenReturn(Optional.of(result)); + + // Act + Optional results1 = handler.get(get1); + Optional results2 = handler.get(get2); + + // Assert + assertThat(results1) + .isEqualTo( + Optional.of( + new FilteredResult( + expected.get(), Collections.emptyList(), TABLE_METADATA, false))); + assertThat(results1).isEqualTo(results2); + verify(storage, never()).get(originalGet); + verify(storage).get(getForStorage); + } + @Test public void scan_ResultGivenFromStorage_ShouldUpdateSnapshotAndReturn() throws ExecutionException, CrudException { // Arrange Scan scan = prepareScan(); + Scan scanForStorage = toScanForStorageFrom(scan); result = prepareResult(TransactionState.COMMITTED); Snapshot.Key key = new Snapshot.Key(scan, result); - when(snapshot.get(key)).thenReturn(Optional.of((TransactionResult) result)); + TransactionResult expected = new TransactionResult(result); doNothing() .when(snapshot) .put(any(Snapshot.Key.class), ArgumentMatchers.>any()); when(scanner.iterator()).thenReturn(Collections.singletonList(result).iterator()); - when(storage.scan(scan)).thenReturn(scanner); + when(storage.scan(scanForStorage)).thenReturn(scanner); + when(snapshot.mergeResult(any(), any())).thenReturn(Optional.of(expected)); // Act List results = handler.scan(scan); // Assert - TransactionResult expected = new TransactionResult(result); verify(snapshot).put(key, Optional.of(expected)); verify(snapshot).verify(scan); assertThat(results.size()).isEqualTo(1); @@ -247,9 +357,10 @@ public void scan_ResultGivenFromStorage_ShouldUpdateSnapshotAndReturn() throws ExecutionException { // Arrange Scan scan = prepareScan(); + Scan scanForStorage = toScanForStorageFrom(scan); result = prepareResult(TransactionState.PREPARED); when(scanner.iterator()).thenReturn(Collections.singletonList(result).iterator()); - when(storage.scan(scan)).thenReturn(scanner); + when(storage.scan(scanForStorage)).thenReturn(scanner); // Act assertThatThrownBy(() -> handler.scan(scan)).isInstanceOf(UncommittedRecordException.class); @@ -263,99 +374,119 @@ public void scan_ResultGivenFromStorage_ShouldUpdateSnapshotAndReturn() public void scan_CalledTwice_SecondTimeShouldReturnTheSameFromSnapshot() throws ExecutionException, CrudException { // Arrange - Scan scan = prepareScan(); + Scan originalScan = prepareScan(); + Scan scanForStorage = toScanForStorageFrom(originalScan); + Scan scan1 = prepareScan(); + Scan scan2 = prepareScan(); result = prepareResult(TransactionState.COMMITTED); + TransactionResult expected = new TransactionResult(result); doNothing() .when(snapshot) .put(any(Snapshot.Key.class), ArgumentMatchers.>any()); when(scanner.iterator()).thenReturn(Collections.singletonList(result).iterator()); - when(storage.scan(scan)).thenReturn(scanner); - Snapshot.Key key = new Snapshot.Key(scan, result); - when(snapshot.get(scan)) + when(storage.scan(scanForStorage)).thenReturn(scanner); + Snapshot.Key key = new Snapshot.Key(scanForStorage, result); + when(snapshot.get(scanForStorage)) .thenReturn(Optional.empty()) - .thenReturn(Optional.of(Collections.singletonList(key))); - when(snapshot.containsKeyInReadSet(key)).thenReturn(false).thenReturn(true); - when(snapshot.get(key)).thenReturn(Optional.of((TransactionResult) result)); + .thenReturn(Optional.of(Collections.singletonMap(key, expected))); + when(snapshot.mergeResult(any(), any())).thenReturn(Optional.of(expected)); // Act - List results1 = handler.scan(scan); - List results2 = handler.scan(scan); + List results1 = handler.scan(scan1); + List results2 = handler.scan(scan2); // Assert - TransactionResult expected = new TransactionResult(result); verify(snapshot).put(key, Optional.of(expected)); assertThat(results1.size()).isEqualTo(1); assertThat(results1.get(0)) .isEqualTo(new FilteredResult(expected, Collections.emptyList(), TABLE_METADATA, false)); assertThat(results1).isEqualTo(results2); + verify(storage, never()).scan(originalScan); + verify(storage).scan(scanForStorage); } @Test public void scan_CalledTwiceUnderRealSnapshot_SecondTimeShouldReturnTheSameFromSnapshot() throws ExecutionException, CrudException { // Arrange - Scan scan = prepareScan(); + Scan originalScan = prepareScan(); + Scan scanForStorage = toScanForStorageFrom(originalScan); + Scan scan1 = prepareScan(); + Scan scan2 = prepareScan(); result = prepareResult(TransactionState.COMMITTED); + TransactionResult expected = new TransactionResult(result); snapshot = new Snapshot(ANY_TX_ID, Isolation.SNAPSHOT, null, tableMetadataManager, parallelExecutor); handler = new CrudHandler(storage, snapshot, tableMetadataManager, false); when(scanner.iterator()).thenReturn(Collections.singletonList(result).iterator()); - when(storage.scan(scan)).thenReturn(scanner); + when(storage.scan(scanForStorage)).thenReturn(scanner); // Act - List results1 = handler.scan(scan); - List results2 = handler.scan(scan); + List results1 = handler.scan(scan1); + List results2 = handler.scan(scan2); // Assert - TransactionResult expected = new TransactionResult(result); assertThat(results1.size()).isEqualTo(1); assertThat(results1.get(0)) .isEqualTo(new FilteredResult(expected, Collections.emptyList(), TABLE_METADATA, false)); assertThat(results1).isEqualTo(results2); + verify(storage, never()).scan(originalScan); + verify(storage).scan(scanForStorage); } @Test - public void scan_GetCalledAfterScan_ShouldReturnFromSnapshot() + public void scan_GetCalledAfterScan_ShouldReturnFromStorage() throws ExecutionException, CrudException { // Arrange Scan scan = prepareScan(); + Scan scanForStorage = toScanForStorageFrom(scan); result = prepareResult(TransactionState.COMMITTED); doNothing() .when(snapshot) .put(any(Snapshot.Key.class), ArgumentMatchers.>any()); when(scanner.iterator()).thenReturn(Collections.singletonList(result).iterator()); - when(storage.scan(scan)).thenReturn(scanner); - Snapshot.Key key = new Snapshot.Key(scan, result); - when(snapshot.get(scan)).thenReturn(Optional.empty()); - when(snapshot.containsKeyInReadSet(key)).thenReturn(false).thenReturn(true); - when(snapshot.get(key)).thenReturn(Optional.of((TransactionResult) result)); + when(storage.scan(scanForStorage)).thenReturn(scanner); + + Get get = prepareGet(); + Get getForStorage = toGetForStorageFrom(get); + Optional transactionResult = Optional.of(new TransactionResult(result)); + when(storage.get(getForStorage)).thenReturn(Optional.of(result)); + when(snapshot.get(get)).thenReturn(transactionResult); + when(snapshot.mergeResult(any(), any())).thenReturn(transactionResult); + when(snapshot.mergeResult(new Snapshot.Key(get), transactionResult)) + .thenReturn(transactionResult); // Act List results = handler.scan(scan); - Optional result = handler.get(prepareGet()); + Optional result = handler.get(get); // Assert - verify(storage, never()).get(any(Get.class)); + verify(storage).scan(scanForStorage); + verify(storage).get(getForStorage); assertThat(results.get(0)).isEqualTo(result.get()); } @Test - public void scan_GetCalledAfterScanUnderRealSnapshot_ShouldReturnFromSnapshot() + public void scan_GetCalledAfterScanUnderRealSnapshot_ShouldReturnFromStorage() throws ExecutionException, CrudException { // Arrange - Scan scan = prepareScan(); + Scan scan = toScanForStorageFrom(prepareScan()); result = prepareResult(TransactionState.COMMITTED); snapshot = new Snapshot(ANY_TX_ID, Isolation.SNAPSHOT, null, tableMetadataManager, parallelExecutor); handler = new CrudHandler(storage, snapshot, tableMetadataManager, false); when(scanner.iterator()).thenReturn(Collections.singletonList(result).iterator()); when(storage.scan(scan)).thenReturn(scanner); + Get get = toGetForStorageFrom(prepareGet()); + when(storage.get(get)).thenReturn(Optional.of(result)); // Act List results = handler.scan(scan); - Optional result = handler.get(prepareGet()); + Optional result = handler.get(get); // Assert + verify(storage).scan(scan); + verify(storage).get(get); assertThat(results.get(0)).isEqualTo(result.get()); } @@ -364,6 +495,7 @@ public void scan_CalledAfterDeleteUnderRealSnapshot_ShouldReturnResultsWithoutDe throws ExecutionException, CrudException { // Arrange Scan scan = prepareScan(); + Scan scanForStorage = toScanForStorageFrom(scan); result = prepareResult(TransactionState.COMMITTED); ImmutableMap> columns = @@ -396,10 +528,11 @@ public void scan_CalledAfterDeleteUnderRealSnapshot_ShouldReturnResultsWithoutDe readSet, new HashMap<>(), new HashMap<>(), + new HashMap<>(), deleteSet); handler = new CrudHandler(storage, snapshot, tableMetadataManager, false); when(scanner.iterator()).thenReturn(Arrays.asList(result, result2).iterator()); - when(storage.scan(scan)).thenReturn(scanner); + when(storage.scan(scanForStorage)).thenReturn(scanner); Delete delete = new Delete(new Key(ANY_NAME_1, ANY_TEXT_1), new Key(ANY_NAME_2, ANY_TEXT_3)) @@ -437,23 +570,24 @@ public void scan_CalledAfterDeleteUnderRealSnapshot_ShouldReturnResultsWithoutDe Scan scan = prepareRelationalScan(); result = prepareResult(TransactionState.COMMITTED); Snapshot.Key key = new Snapshot.Key(scan, result); - when(snapshot.get(key)).thenReturn(Optional.of((TransactionResult) result)); doNothing() .when(snapshot) .put(any(Snapshot.Key.class), ArgumentMatchers.>any()); when(scanner.iterator()).thenReturn(Collections.singletonList(result).iterator()); when(storage.scan(any(ScanAll.class))).thenReturn(scanner); + TransactionResult transactionResult = new TransactionResult(result); + when(snapshot.mergeResult(any(), any())).thenReturn(Optional.of(transactionResult)); // Act List results = handler.scan(scan); // Assert - TransactionResult expected = new TransactionResult(result); - verify(snapshot).put(key, Optional.of(expected)); + verify(snapshot).put(key, Optional.of(transactionResult)); verify(snapshot).verify(scan); assertThat(results.size()).isEqualTo(1); assertThat(results.get(0)) - .isEqualTo(new FilteredResult(expected, Collections.emptyList(), TABLE_METADATA, false)); + .isEqualTo( + new FilteredResult(transactionResult, Collections.emptyList(), TABLE_METADATA, false)); } @Test @@ -495,13 +629,12 @@ public void put_WithResultInReadSet_shouldCallAppropriateMethods() } @Test - public void put_WithoutResultInReadSet_shouldCallAppropriateMethods() - throws UnsatisfiedConditionException { + public void put_WithoutResultInReadSet_shouldCallAppropriateMethods() throws CrudException { // Arrange Put put = Put.newBuilder().namespace("ns").table("tbl").partitionKey(Key.ofText("c1", "foo")).build(); Snapshot.Key key = new Snapshot.Key(put); - when(snapshot.getFromReadSet(any())).thenReturn(Optional.empty()); + when(snapshot.containsKeyInReadSet(any())).thenReturn(false); // Act handler.put(put); @@ -536,8 +669,7 @@ public void delete_WithResultInReadSet_shouldCallAppropriateMethods() } @Test - public void delete_WithoutResultInReadSet_shouldCallAppropriateMethods() - throws UnsatisfiedConditionException { + public void delete_WithoutResultInReadSet_shouldCallAppropriateMethods() throws CrudException { // Arrange Delete delete = Delete.newBuilder() @@ -546,7 +678,7 @@ public void delete_WithoutResultInReadSet_shouldCallAppropriateMethods() .partitionKey(Key.ofText("c1", "foo")) .build(); Snapshot.Key key = new Snapshot.Key(delete); - when(snapshot.getFromReadSet(any())).thenReturn(Optional.empty()); + when(snapshot.containsKeyInReadSet(any())).thenReturn(false); // Act handler.delete(delete); @@ -556,4 +688,176 @@ public void delete_WithoutResultInReadSet_shouldCallAppropriateMethods() verify(mutationConditionsValidator).checkIfConditionIsSatisfied(delete, null); verify(snapshot).put(key, delete); } + + /* + @SuppressWarnings("unchecked") + @Test + public void readUnread_GetContainedInGetSet_ShouldCallAppropriateMethods() + throws CrudException, ExecutionException { + // Arrange + Snapshot.Key key = mock(Snapshot.Key.class); + when(key.getNamespace()).thenReturn(ANY_NAMESPACE_NAME); + when(key.getTable()).thenReturn(ANY_TABLE_NAME); + when(key.getPartitionKey()).thenReturn(Key.ofText(ANY_NAME_1, ANY_TEXT_1)); + Get getForKey = + Get.newBuilder() + .namespace(key.getNamespace()) + .table(key.getTable()) + .partitionKey(key.getPartitionKey()) + .build(); + when(snapshot.containsKeyInGetSet(getForKey)).thenReturn(true); + + // Act + handler.readUnread(key, getForKey); + + // Assert + verify(storage, never()).get(any()); + verify(snapshot, never()).put(any(Get.class), any(Optional.class)); + } + + @Test + public void + readUnread_GetNotContainedInGetSet_EmptyResultReturnedByStorage_ShouldCallAppropriateMethods() + throws CrudException, ExecutionException { + // Arrange + Snapshot.Key key = mock(Snapshot.Key.class); + when(key.getNamespace()).thenReturn(ANY_NAMESPACE_NAME); + when(key.getTable()).thenReturn(ANY_TABLE_NAME); + when(key.getPartitionKey()).thenReturn(Key.ofText(ANY_NAME_1, ANY_TEXT_1)); + Get getForKey = + Get.newBuilder() + .namespace(key.getNamespace()) + .table(key.getTable()) + .partitionKey(key.getPartitionKey()) + .build(); + when(snapshot.containsKeyInGetSet(getForKey)).thenReturn(false); + when(storage.get(any())).thenReturn(Optional.empty()); + + // Act + handler.readUnread(key, getForKey); + + // Assert + verify(storage).get(any()); + verify(snapshot).put(key, Optional.empty()); + verify(snapshot).put(getForKey, Optional.empty()); + } + + @Test + public void + readUnread_GetNotContainedInGetSet_CommittedRecordReturnedByStorage_ShouldCallAppropriateMethods() + throws CrudException, ExecutionException { + // Arrange + Snapshot.Key key = mock(Snapshot.Key.class); + when(key.getNamespace()).thenReturn(ANY_NAMESPACE_NAME); + when(key.getTable()).thenReturn(ANY_TABLE_NAME); + when(key.getPartitionKey()).thenReturn(Key.ofText(ANY_NAME_1, ANY_TEXT_1)); + + Result result = mock(Result.class); + when(result.getInt(Attribute.STATE)).thenReturn(TransactionState.COMMITTED.get()); + when(storage.get(any())).thenReturn(Optional.of(result)); + + Get getForKey = + Get.newBuilder() + .namespace(key.getNamespace()) + .table(key.getTable()) + .partitionKey(key.getPartitionKey()) + .build(); + when(snapshot.containsKeyInGetSet(getForKey)).thenReturn(false); + + // Act + handler.readUnread(key, getForKey); + + // Assert + verify(storage).get(any()); + verify(snapshot).put(key, Optional.of(new TransactionResult(result))); + } + + @Test + public void + readUnread_GetNotContainedInGetSet_UncommittedRecordReturnedByStorage_ShouldThrowUncommittedRecordException() + throws ExecutionException { + // Arrange + Snapshot.Key key = mock(Snapshot.Key.class); + when(key.getNamespace()).thenReturn(ANY_NAMESPACE_NAME); + when(key.getTable()).thenReturn(ANY_TABLE_NAME); + when(key.getPartitionKey()).thenReturn(Key.ofText(ANY_NAME_1, ANY_TEXT_1)); + + Result result = mock(Result.class); + when(result.getInt(Attribute.STATE)).thenReturn(TransactionState.PREPARED.get()); + when(storage.get(any())).thenReturn(Optional.of(result)); + + Get getForKey = + Get.newBuilder() + .namespace(key.getNamespace()) + .table(key.getTable()) + .partitionKey(key.getPartitionKey()) + .build(); + when(snapshot.containsKeyInGetSet(getForKey)).thenReturn(false); + + // Act Assert + assertThatThrownBy(() -> handler.readUnread(key, getForKey)) + .isInstanceOf(UncommittedRecordException.class) + .satisfies( + e -> { + UncommittedRecordException exception = (UncommittedRecordException) e; + assertThat(exception.getSelection()).isEqualTo(getForKey); + assertThat(exception.getResults().size()).isEqualTo(1); + assertThat(exception.getResults().get(0)).isEqualTo(result); + }); + } + + @Test + public void readIfImplicitPreReadEnabled_ShouldCallAppropriateMethods() throws CrudException { + // Arrange + Put put1 = mock(Put.class); + when(put1.forNamespace()).thenReturn(Optional.of(ANY_NAMESPACE_NAME)); + when(put1.forTable()).thenReturn(Optional.of(ANY_TABLE_NAME)); + when(put1.getPartitionKey()).thenReturn(Key.ofText(ANY_NAME_1, ANY_TEXT_1)); + when(put1.isImplicitPreReadEnabled()).thenReturn(true); + + Put put2 = mock(Put.class); + when(put2.forNamespace()).thenReturn(Optional.of(ANY_NAMESPACE_NAME)); + when(put2.forTable()).thenReturn(Optional.of(ANY_TABLE_NAME)); + when(put2.getPartitionKey()).thenReturn(Key.ofText(ANY_NAME_1, ANY_TEXT_2)); + when(put2.isImplicitPreReadEnabled()).thenReturn(true); + + Put put3 = mock(Put.class); + when(put3.forNamespace()).thenReturn(Optional.of(ANY_NAMESPACE_NAME)); + when(put3.forTable()).thenReturn(Optional.of(ANY_TABLE_NAME)); + when(put3.getPartitionKey()).thenReturn(Key.ofText(ANY_NAME_1, ANY_TEXT_3)); + when(put3.isImplicitPreReadEnabled()).thenReturn(false); + + when(snapshot.getPutsInWriteSet()).thenReturn(Arrays.asList(put1, put2, put3)); + + Delete delete1 = mock(Delete.class); + when(delete1.forNamespace()).thenReturn(Optional.of(ANY_NAMESPACE_NAME)); + when(delete1.forTable()).thenReturn(Optional.of(ANY_TABLE_NAME)); + when(delete1.getPartitionKey()).thenReturn(Key.ofText(ANY_NAME_1, ANY_TEXT_1)); + + Delete delete2 = mock(Delete.class); + when(delete2.forNamespace()).thenReturn(Optional.of(ANY_NAMESPACE_NAME)); + when(delete2.forTable()).thenReturn(Optional.of(ANY_TABLE_NAME)); + when(delete2.getPartitionKey()).thenReturn(Key.ofText(ANY_NAME_1, ANY_TEXT_2)); + + when(snapshot.getDeletesInDeleteSet()).thenReturn(Arrays.asList(delete1, delete2)); + + when(snapshot.getId()).thenReturn(ANY_TX_ID); + + // Act + handler.readIfImplicitPreReadEnabled(); + + // Assert + @SuppressWarnings("unchecked") + ArgumentCaptor> tasksCaptor = + ArgumentCaptor.forClass(List.class); + ArgumentCaptor transactionIdCaptor = ArgumentCaptor.forClass(String.class); + verify(parallelExecutor) + .executeImplicitPreRead(tasksCaptor.capture(), transactionIdCaptor.capture()); + + List tasks = tasksCaptor.getValue(); + assertThat(tasks.size()).isEqualTo(4); + + assertThat(transactionIdCaptor.getValue()).isEqualTo(ANY_TX_ID); + } + */ } diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/SnapshotTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/SnapshotTest.java index 0a201b0e6f..54da76a8d2 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/SnapshotTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/SnapshotTest.java @@ -44,7 +44,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Optional; import org.junit.jupiter.api.BeforeEach; @@ -90,7 +89,8 @@ public class SnapshotTest { private Snapshot snapshot; private Map> readSet; - private Map> scanSet; + private Map> getSet; + private Map> scanSet; private Map writeSet; private Map deleteSet; @@ -117,6 +117,7 @@ private Snapshot prepareSnapshot(Isolation isolation) { private Snapshot prepareSnapshot(Isolation isolation, SerializableStrategy strategy) { readSet = new HashMap<>(); + getSet = new HashMap<>(); scanSet = new HashMap<>(); writeSet = new HashMap<>(); deleteSet = new HashMap<>(); @@ -129,6 +130,7 @@ private Snapshot prepareSnapshot(Isolation isolation, SerializableStrategy strat tableMetadataManager, new ParallelExecutor(config), readSet, + getSet, scanSet, writeSet, deleteSet)); @@ -247,6 +249,17 @@ private Put preparePutWithIntColumns() { .build(); } + private Put preparePutForMergeTest() { + return Put.newBuilder() + .namespace(ANY_NAMESPACE_NAME) + .table(ANY_TABLE_NAME) + .partitionKey(Key.ofText(ANY_NAME_1, ANY_TEXT_1)) + .clusteringKey(Key.ofText(ANY_NAME_2, ANY_TEXT_2)) + .textValue(ANY_NAME_3, ANY_TEXT_5) + .textValue(ANY_NAME_4, null) + .build(); + } + private Delete prepareDelete() { Key partitionKey = new Key(ANY_NAME_1, ANY_TEXT_1); Key clusteringKey = new Key(ANY_NAME_2, ANY_TEXT_2); @@ -352,8 +365,9 @@ public void put_ScanGiven_ShouldHoldWhatsGivenInScanSet() { // Arrange snapshot = prepareSnapshot(Isolation.SNAPSHOT); Scan scan = prepareScan(); - Snapshot.Key key = new Snapshot.Key(scan, prepareResult(ANY_ID)); - List expected = Collections.singletonList(key); + TransactionResult result = prepareResult(ANY_ID); + Snapshot.Key key = new Snapshot.Key(scan, result); + Map expected = Collections.singletonMap(key, result); // Act snapshot.put(scan, expected); @@ -363,7 +377,7 @@ public void put_ScanGiven_ShouldHoldWhatsGivenInScanSet() { } @Test - public void get_KeyGivenContainedInWriteSetAndReadSet_ShouldReturnMergedResult() + public void mergeResult_KeyGivenContainedInWriteSet_ShouldReturnMergedResult() throws CrudException { // Arrange snapshot = prepareSnapshot(Isolation.SNAPSHOT); @@ -382,11 +396,47 @@ public void get_KeyGivenContainedInWriteSetAndReadSet_ShouldReturnMergedResult() snapshot.put(key, put); // Act - Optional actual = snapshot.get(key); + Optional actual = snapshot.mergeResult(key, Optional.of(result)); // Assert assertThat(actual).isPresent(); - assertThat(actual.get().getValues()) + assertMergedResultIsEqualTo(actual.get()); + } + + @Test + public void mergeResult_KeyGivenContainedInDeleteSet_ShouldReturnEmpty() throws CrudException { + // Arrange + snapshot = prepareSnapshot(Isolation.SNAPSHOT); + Delete delete = prepareDelete(); + Snapshot.Key key = new Snapshot.Key(delete); + snapshot.put(key, delete); + TransactionResult result = prepareResult(ANY_ID); + + // Act + Optional actual = snapshot.mergeResult(key, Optional.of(result)); + + // Assert + assertThat(actual).isNotPresent(); + } + + @Test + public void + mergeResult_KeyGivenNeitherContainedInDeleteSetNorWriteSet_ShouldReturnOriginalResult() + throws CrudException { + // Arrange + snapshot = prepareSnapshot(Isolation.SNAPSHOT); + Snapshot.Key key = new Snapshot.Key(prepareGet()); + TransactionResult result = prepareResult(ANY_ID); + + // Act + Optional actual = snapshot.mergeResult(key, Optional.of(result)); + + // Assert + assertThat(actual).isEqualTo(Optional.of(result)); + } + + private void assertMergedResultIsEqualTo(TransactionResult result) { + assertThat(result.getValues()) .isEqualTo( ImmutableMap.>builder() .put(ANY_NAME_1, new TextValue(ANY_NAME_1, ANY_TEXT_1)) @@ -396,25 +446,22 @@ public void get_KeyGivenContainedInWriteSetAndReadSet_ShouldReturnMergedResult() .put(Attribute.ID, Attribute.toIdValue(ANY_ID)) .put(Attribute.VERSION, Attribute.toVersionValue(ANY_VERSION)) .build()); - assertThat(actual.get().getValue(ANY_NAME_1).isPresent()).isTrue(); - assertThat(actual.get().getValue(ANY_NAME_1).get()) - .isEqualTo(new TextValue(ANY_NAME_1, ANY_TEXT_1)); - assertThat(actual.get().getValue(ANY_NAME_2).isPresent()).isTrue(); - assertThat(actual.get().getValue(ANY_NAME_2).get()) - .isEqualTo(new TextValue(ANY_NAME_2, ANY_TEXT_2)); - assertThat(actual.get().getValue(ANY_NAME_3).isPresent()).isTrue(); - assertThat(actual.get().getValue(ANY_NAME_3).get()) - .isEqualTo(new TextValue(ANY_NAME_3, ANY_TEXT_5)); - assertThat(actual.get().getValue(ANY_NAME_4).isPresent()).isTrue(); - assertThat(actual.get().getValue(ANY_NAME_4).get()) + assertThat(result.getValue(ANY_NAME_1).isPresent()).isTrue(); + assertThat(result.getValue(ANY_NAME_1).get()).isEqualTo(new TextValue(ANY_NAME_1, ANY_TEXT_1)); + assertThat(result.getValue(ANY_NAME_2).isPresent()).isTrue(); + assertThat(result.getValue(ANY_NAME_2).get()).isEqualTo(new TextValue(ANY_NAME_2, ANY_TEXT_2)); + assertThat(result.getValue(ANY_NAME_3).isPresent()).isTrue(); + assertThat(result.getValue(ANY_NAME_3).get()).isEqualTo(new TextValue(ANY_NAME_3, ANY_TEXT_5)); + assertThat(result.getValue(ANY_NAME_4).isPresent()).isTrue(); + assertThat(result.getValue(ANY_NAME_4).get()) .isEqualTo(new TextValue(ANY_NAME_4, (String) null)); - assertThat(actual.get().getValue(Attribute.ID).isPresent()).isTrue(); - assertThat(actual.get().getValue(Attribute.ID).get()).isEqualTo(Attribute.toIdValue(ANY_ID)); - assertThat(actual.get().getValue(Attribute.VERSION).isPresent()).isTrue(); - assertThat(actual.get().getValue(Attribute.VERSION).get()) + assertThat(result.getValue(Attribute.ID).isPresent()).isTrue(); + assertThat(result.getValue(Attribute.ID).get()).isEqualTo(Attribute.toIdValue(ANY_ID)); + assertThat(result.getValue(Attribute.VERSION).isPresent()).isTrue(); + assertThat(result.getValue(Attribute.VERSION).get()) .isEqualTo(Attribute.toVersionValue(ANY_VERSION)); - assertThat(actual.get().getContainedColumnNames()) + assertThat(result.getContainedColumnNames()) .isEqualTo( new HashSet<>( Arrays.asList( @@ -425,105 +472,35 @@ public void get_KeyGivenContainedInWriteSetAndReadSet_ShouldReturnMergedResult() Attribute.ID, Attribute.VERSION))); - assertThat(actual.get().contains(ANY_NAME_1)).isTrue(); - assertThat(actual.get().isNull(ANY_NAME_1)).isFalse(); - assertThat(actual.get().getText(ANY_NAME_1)).isEqualTo(ANY_TEXT_1); - assertThat(actual.get().getAsObject(ANY_NAME_1)).isEqualTo(ANY_TEXT_1); - - assertThat(actual.get().contains(ANY_NAME_2)).isTrue(); - assertThat(actual.get().isNull(ANY_NAME_2)).isFalse(); - assertThat(actual.get().getText(ANY_NAME_2)).isEqualTo(ANY_TEXT_2); - assertThat(actual.get().getAsObject(ANY_NAME_2)).isEqualTo(ANY_TEXT_2); - - assertThat(actual.get().contains(ANY_NAME_3)).isTrue(); - assertThat(actual.get().isNull(ANY_NAME_3)).isFalse(); - assertThat(actual.get().getText(ANY_NAME_3)).isEqualTo(ANY_TEXT_5); - assertThat(actual.get().getAsObject(ANY_NAME_3)).isEqualTo(ANY_TEXT_5); - - assertThat(actual.get().contains(ANY_NAME_4)).isTrue(); - assertThat(actual.get().isNull(ANY_NAME_4)).isTrue(); - assertThat(actual.get().getText(ANY_NAME_4)).isNull(); - assertThat(actual.get().getAsObject(ANY_NAME_4)).isNull(); - - assertThat(actual.get().contains(Attribute.ID)).isTrue(); - assertThat(actual.get().isNull(Attribute.ID)).isFalse(); - assertThat(actual.get().getText(Attribute.ID)).isEqualTo(ANY_ID); - assertThat(actual.get().getAsObject(Attribute.ID)).isEqualTo(ANY_ID); - - assertThat(actual.get().contains(Attribute.VERSION)).isTrue(); - assertThat(actual.get().isNull(Attribute.VERSION)).isFalse(); - assertThat(actual.get().getInt(Attribute.VERSION)).isEqualTo(ANY_VERSION); - assertThat(actual.get().getAsObject(Attribute.VERSION)).isEqualTo(ANY_VERSION); - } - - @Test - public void get_KeyGivenContainedInReadSet_ShouldReturnFromReadSet() throws CrudException { - // Arrange - snapshot = prepareSnapshot(Isolation.SNAPSHOT); - Snapshot.Key key = new Snapshot.Key(prepareGet()); - TransactionResult result = prepareResult(ANY_ID); - snapshot.put(key, Optional.of(result)); - - // Act - Optional actual = snapshot.get(key); - - // Assert - assertThat(actual).isEqualTo(Optional.of(result)); - } - - @Test - public void get_KeyGivenNotContainedInSnapshot_ShouldThrowIllegalArgumentException() { - // Arrange - snapshot = prepareSnapshot(Isolation.SNAPSHOT); - Snapshot.Key key = new Snapshot.Key(prepareGet()); - - // Act Assert - assertThatThrownBy(() -> snapshot.get(key)).isInstanceOf(IllegalArgumentException.class); - } - - @Test - public void get_KeyGivenContainedInWriteSet_ShouldThrowIllegalArgumentException() { - // Arrange - snapshot = prepareSnapshot(Isolation.SNAPSHOT); - Put put = preparePut(); - Snapshot.Key key = new Snapshot.Key(put); - snapshot.put(key, put); - - // Act Assert - assertThatThrownBy(() -> snapshot.get(key)).isInstanceOf(IllegalArgumentException.class); - } - - @Test - public void get_KeyGivenContainedInDeleteSet_ShouldReturnEmpty() throws CrudException { - // Arrange - snapshot = prepareSnapshot(Isolation.SNAPSHOT); - Delete delete = prepareDelete(); - Snapshot.Key key = new Snapshot.Key(delete); - snapshot.put(key, delete); + assertThat(result.contains(ANY_NAME_1)).isTrue(); + assertThat(result.isNull(ANY_NAME_1)).isFalse(); + assertThat(result.getText(ANY_NAME_1)).isEqualTo(ANY_TEXT_1); + assertThat(result.getAsObject(ANY_NAME_1)).isEqualTo(ANY_TEXT_1); - // Act - Optional actual = snapshot.get(key); - - // Assert - assertThat(actual).isNotPresent(); - } + assertThat(result.contains(ANY_NAME_2)).isTrue(); + assertThat(result.isNull(ANY_NAME_2)).isFalse(); + assertThat(result.getText(ANY_NAME_2)).isEqualTo(ANY_TEXT_2); + assertThat(result.getAsObject(ANY_NAME_2)).isEqualTo(ANY_TEXT_2); - @Test - public void get_KeyGivenContainedInReadSetAndDeleteSet_ShouldReturnEmpty() throws CrudException { - // Arrange - snapshot = prepareSnapshot(Isolation.SNAPSHOT); - Snapshot.Key key = new Snapshot.Key(prepareGet()); - TransactionResult result = prepareResult(ANY_ID); - snapshot.put(key, Optional.of(result)); + assertThat(result.contains(ANY_NAME_3)).isTrue(); + assertThat(result.isNull(ANY_NAME_3)).isFalse(); + assertThat(result.getText(ANY_NAME_3)).isEqualTo(ANY_TEXT_5); + assertThat(result.getAsObject(ANY_NAME_3)).isEqualTo(ANY_TEXT_5); - Delete delete = prepareDelete(); - snapshot.put(key, delete); + assertThat(result.contains(ANY_NAME_4)).isTrue(); + assertThat(result.isNull(ANY_NAME_4)).isTrue(); + assertThat(result.getText(ANY_NAME_4)).isNull(); + assertThat(result.getAsObject(ANY_NAME_4)).isNull(); - // Act - Optional actual = snapshot.get(key); + assertThat(result.contains(Attribute.ID)).isTrue(); + assertThat(result.isNull(Attribute.ID)).isFalse(); + assertThat(result.getText(Attribute.ID)).isEqualTo(ANY_ID); + assertThat(result.getAsObject(Attribute.ID)).isEqualTo(ANY_ID); - // Assert - assertThat(actual).isNotPresent(); + assertThat(result.contains(Attribute.VERSION)).isTrue(); + assertThat(result.isNull(Attribute.VERSION)).isFalse(); + assertThat(result.getInt(Attribute.VERSION)).isEqualTo(ANY_VERSION); + assertThat(result.getAsObject(Attribute.VERSION)).isEqualTo(ANY_VERSION); } @Test @@ -533,10 +510,10 @@ public void get_ScanNotContainedInSnapshotGiven_ShouldReturnEmptyList() { Scan scan = prepareScan(); // Act - Optional> keys = snapshot.get(scan); + Optional> results = snapshot.get(scan); // Assert - assertThat(keys.isPresent()).isFalse(); + assertThat(results.isPresent()).isFalse(); } @Test @@ -687,6 +664,7 @@ public void toSerializableWithExtraWrite_UnmutatedReadSetExists_ShouldConvertRea TransactionResult result = prepareResult(ANY_ID); TransactionResult txResult = new TransactionResult(result); snapshot.put(new Snapshot.Key(get), Optional.of(txResult)); + snapshot.put(get, Optional.of(txResult)); snapshot.put(new Snapshot.Key(put), put); // Act Assert @@ -713,6 +691,7 @@ public void toSerializableWithExtraWrite_UnmutatedReadSetExists_ShouldConvertRea Get get = prepareAnotherGet(); Put put = preparePut(); snapshot.put(new Snapshot.Key(get), Optional.empty()); + snapshot.put(get, Optional.empty()); snapshot.put(new Snapshot.Key(put), put); // Act Assert @@ -734,9 +713,11 @@ public void toSerializableWithExtraWrite_UnmutatedReadSetExists_ShouldConvertRea // Arrange snapshot = prepareSnapshot(Isolation.SERIALIZABLE, SerializableStrategy.EXTRA_WRITE); Scan scan = prepareScan(); - Snapshot.Key key = new Snapshot.Key(scan, prepareResult(ANY_ID)); + TransactionResult txResult = prepareResult(ANY_ID); + Snapshot.Key key = new Snapshot.Key(scan, txResult); Put put = preparePut(); - snapshot.put(scan, Collections.singletonList(key)); + snapshot.put(key, Optional.of(txResult)); + snapshot.put(scan, Collections.singletonMap(key, txResult)); snapshot.put(new Snapshot.Key(put), put); // Act Assert @@ -756,6 +737,7 @@ public void toSerializableWithExtraRead_ReadSetNotChanged_ShouldProcessWithoutEx TransactionResult result = prepareResult(ANY_ID); TransactionResult txResult = new TransactionResult(result); snapshot.put(new Snapshot.Key(get), Optional.of(txResult)); + snapshot.put(get, Optional.of(txResult)); snapshot.put(new Snapshot.Key(put), put); DistributedStorage storage = mock(DistributedStorage.class); Get getWithProjections = @@ -778,6 +760,7 @@ public void toSerializableWithExtraRead_ReadSetUpdated_ShouldThrowValidationConf Put put = preparePut(); TransactionResult txResult = prepareResult(ANY_ID); snapshot.put(new Snapshot.Key(get), Optional.of(txResult)); + snapshot.put(get, Optional.of(txResult)); snapshot.put(new Snapshot.Key(put), put); DistributedStorage storage = mock(DistributedStorage.class); TransactionResult changedTxResult = prepareResult(ANY_ID + "x"); @@ -801,6 +784,7 @@ public void toSerializableWithExtraRead_ReadSetExtended_ShouldThrowValidationCon Get get = prepareAnotherGet(); Put put = preparePut(); snapshot.put(new Snapshot.Key(get), Optional.empty()); + snapshot.put(get, Optional.empty()); snapshot.put(new Snapshot.Key(put), put); DistributedStorage storage = mock(DistributedStorage.class); TransactionResult txResult = prepareResult(ANY_ID); @@ -826,7 +810,7 @@ public void toSerializableWithExtraRead_ScanSetNotChanged_ShouldProcessWithoutEx TransactionResult txResult = prepareResult(ANY_ID); Snapshot.Key key = new Snapshot.Key(scan, txResult); snapshot.put(key, Optional.of(txResult)); - snapshot.put(scan, Collections.singletonList(key)); + snapshot.put(scan, Collections.singletonMap(key, txResult)); snapshot.put(new Snapshot.Key(put), put); DistributedStorage storage = mock(DistributedStorage.class); Scanner scanner = mock(Scanner.class); @@ -854,7 +838,7 @@ public void toSerializableWithExtraRead_ScanSetUpdated_ShouldThrowValidationConf TransactionResult txResult = prepareResult(ANY_ID); Snapshot.Key key = new Snapshot.Key(scan, txResult); snapshot.put(key, Optional.of(txResult)); - snapshot.put(scan, Collections.singletonList(key)); + snapshot.put(scan, Collections.singletonMap(key, txResult)); snapshot.put(new Snapshot.Key(put), put); DistributedStorage storage = mock(DistributedStorage.class); TransactionResult changedTxResult = prepareResult(ANY_ID + "x"); @@ -883,7 +867,7 @@ public void toSerializableWithExtraRead_ScanSetExtended_ShouldThrowValidationCon Scan scan = prepareScan(); Put put = preparePut(); TransactionResult result = prepareResult(ANY_ID + "x"); - snapshot.put(scan, Collections.emptyList()); + snapshot.put(scan, Collections.emptyMap()); snapshot.put(new Snapshot.Key(put), put); DistributedStorage storage = mock(DistributedStorage.class); TransactionResult txResult = new TransactionResult(result); @@ -954,8 +938,8 @@ public void toSerializableWithExtraRead_ScanSetExtended_ShouldThrowValidationCon Snapshot.Key key1 = new Snapshot.Key(scan1, result1); Snapshot.Key key2 = new Snapshot.Key(scan2, result2); - snapshot.put(scan1, Collections.singletonList(key1)); - snapshot.put(scan2, Collections.singletonList(key2)); + snapshot.put(scan1, Collections.singletonMap(key1, new TransactionResult(result1))); + snapshot.put(scan2, Collections.singletonMap(key2, new TransactionResult(result2))); snapshot.put(key1, Optional.of(new TransactionResult(result1))); snapshot.put(key2, Optional.of(new TransactionResult(result2))); @@ -1000,6 +984,7 @@ public void toSerializableWithExtraRead_ScanSetExtended_ShouldThrowValidationCon TransactionResult result = prepareResultWithNullMetadata(); TransactionResult txResult = new TransactionResult(result); snapshot.put(new Snapshot.Key(get), Optional.of(result)); + snapshot.put(get, Optional.of(result)); snapshot.put(new Snapshot.Key(put), put); DistributedStorage storage = mock(DistributedStorage.class); Get getWithProjections = @@ -1024,6 +1009,7 @@ public void toSerializableWithExtraRead_ScanSetExtended_ShouldThrowValidationCon TransactionResult result = prepareResultWithNullMetadata(); TransactionResult changedResult = prepareResult(ANY_ID); snapshot.put(new Snapshot.Key(get), Optional.of(result)); + snapshot.put(get, Optional.of(result)); snapshot.put(new Snapshot.Key(put), put); DistributedStorage storage = mock(DistributedStorage.class); Get getWithProjections = @@ -1048,7 +1034,7 @@ public void toSerializableWithExtraRead_ScanSetExtended_ShouldThrowValidationCon TransactionResult result = prepareResult(ANY_ID); Snapshot.Key key = new Snapshot.Key(scan, result); snapshot.put(key, Optional.of(result)); - snapshot.put(scan, Collections.singletonList(key)); + snapshot.put(scan, Collections.singletonMap(key, result)); DistributedStorage storage = mock(DistributedStorage.class); Scan scanWithProjections = Scan.newBuilder(scan) @@ -1143,6 +1129,27 @@ public void get_ScanGivenAndPutInWriteSetNotOverlappedWithScan_ShouldNotThrowExc assertThat(thrown).doesNotThrowAnyException(); } + @Test + public void + verify_ScanGivenAndPutKeyAlreadyPresentInScanSet_ShouldThrowIllegalArgumentException() { + // Arrange + snapshot = prepareSnapshot(Isolation.SNAPSHOT); + Put put = preparePut(); + Snapshot.Key putKey = new Snapshot.Key(put); + snapshot.put(putKey, put); + Scan scan = prepareScan(); + TransactionResult result = prepareResult(ANY_ID); + Snapshot.Key key = new Snapshot.Key(scan, result); + snapshot.put(key, Optional.of(result)); + snapshot.put(scan, Collections.singletonMap(key, result)); + + // Act Assert + Throwable thrown = catchThrowable(() -> snapshot.verify(scan)); + + // Assert + assertThat(thrown).isInstanceOf(IllegalArgumentException.class); + } + @Test public void verify_ScanGivenAndPutWithSamePartitionKeyWithoutClusteringKeyInWriteSet_ShouldThrowIllegalArgumentException() { @@ -1152,6 +1159,7 @@ public void get_ScanGivenAndPutInWriteSetNotOverlappedWithScan_ShouldNotThrowExc Snapshot.Key putKey = new Snapshot.Key(put); snapshot.put(putKey, put); Scan scan = prepareScan(); + snapshot.put(scan, Collections.emptyMap()); // Act Assert Throwable thrown = catchThrowable(() -> snapshot.verify(scan)); @@ -1175,6 +1183,7 @@ public void get_ScanGivenAndPutInWriteSetNotOverlappedWithScan_ShouldNotThrowExc .withConsistency(Consistency.LINEARIZABLE) .forNamespace(ANY_NAMESPACE_NAME) .forTable(ANY_TABLE_NAME); + snapshot.put(scan, Collections.emptyMap()); // Act Assert Throwable thrown = catchThrowable(() -> snapshot.verify(scan)); @@ -1217,6 +1226,11 @@ public void get_ScanGivenAndPutInWriteSetNotOverlappedWithScan_ShouldNotThrowExc // ["text1", "text2") .withStart(new Key(ANY_NAME_2, ANY_TEXT_1), true) .withEnd(new Key(ANY_NAME_2, ANY_TEXT_2), false); + snapshot.put(scan1, Collections.emptyMap()); + snapshot.put(scan2, Collections.emptyMap()); + snapshot.put(scan3, Collections.emptyMap()); + snapshot.put(scan4, Collections.emptyMap()); + snapshot.put(scan5, Collections.emptyMap()); // Act Assert Throwable thrown1 = catchThrowable(() -> snapshot.verify(scan1)); @@ -1263,6 +1277,9 @@ public void get_ScanGivenAndPutInWriteSetNotOverlappedWithScan_ShouldNotThrowExc .withConsistency(Consistency.LINEARIZABLE) .forNamespace(ANY_NAMESPACE_NAME) .forTable(ANY_TABLE_NAME); + snapshot.put(scan1, Collections.emptyMap()); + snapshot.put(scan2, Collections.emptyMap()); + snapshot.put(scan3, Collections.emptyMap()); // Act Assert Throwable thrown1 = catchThrowable(() -> snapshot.verify(scan1)); @@ -1305,6 +1322,9 @@ public void get_ScanGivenAndPutInWriteSetNotOverlappedWithScan_ShouldNotThrowExc .withConsistency(Consistency.LINEARIZABLE) .forNamespace(ANY_NAMESPACE_NAME) .forTable(ANY_TABLE_NAME); + snapshot.put(scan1, Collections.emptyMap()); + snapshot.put(scan2, Collections.emptyMap()); + snapshot.put(scan3, Collections.emptyMap()); // Act Assert Throwable thrown1 = catchThrowable(() -> snapshot.verify(scan1)); @@ -1330,8 +1350,9 @@ public void verify_ScanWithIndexGivenAndPutInWriteSetInSameTable_ShouldThrowExce .table(ANY_TABLE_NAME) .indexKey(Key.ofText(ANY_NAME_4, ANY_TEXT_4)) .build(); - Snapshot.Key key = new Snapshot.Key(scan, prepareResult(ANY_ID)); - snapshot.put(scan, Collections.singletonList(key)); + TransactionResult result = prepareResult(ANY_ID); + Snapshot.Key key = new Snapshot.Key(scan, result); + snapshot.put(scan, Collections.singletonMap(key, result)); // Act Throwable thrown = catchThrowable(() -> snapshot.verify(scan)); @@ -1359,8 +1380,9 @@ public void verify_ScanWithIndexGivenAndPutInWriteSetInDifferentTable_ShouldNotT .table(ANY_TABLE_NAME) .indexKey(Key.ofText(ANY_NAME_4, ANY_TEXT_4)) .build(); - Snapshot.Key key = new Snapshot.Key(scan, prepareResult(ANY_ID)); - snapshot.put(scan, Collections.singletonList(key)); + TransactionResult result = prepareResult(ANY_ID); + Snapshot.Key key = new Snapshot.Key(scan, result); + snapshot.put(scan, Collections.singletonMap(key, result)); // Act Assert Throwable thrown = catchThrowable(() -> snapshot.verify(scan)); @@ -1399,8 +1421,9 @@ public void verify_ScanWithIndexAndPutWithSameIndexKeyGiven_ShouldThrowException .table(ANY_TABLE_NAME) .indexKey(Key.ofText(ANY_NAME_4, ANY_TEXT_4)) .build(); - Snapshot.Key key = new Snapshot.Key(scan, prepareResult(ANY_ID)); - snapshot.put(scan, Collections.singletonList(key)); + TransactionResult result = prepareResult(ANY_ID); + Snapshot.Key key = new Snapshot.Key(scan, result); + snapshot.put(scan, Collections.singletonMap(key, result)); // Act Throwable thrown = catchThrowable(() -> snapshot.verify(scan)); @@ -1422,8 +1445,9 @@ public void verify_ScanAllGivenAndPutInWriteSetInSameTable_ShouldThrowException( .withConsistency(Consistency.LINEARIZABLE) .forNamespace(ANY_NAMESPACE_NAME) .forTable(ANY_TABLE_NAME); - Snapshot.Key key = new Snapshot.Key(scanAll, prepareResult(ANY_ID)); - snapshot.put(scanAll, Collections.singletonList(key)); + TransactionResult result = prepareResult(ANY_ID); + Snapshot.Key key = new Snapshot.Key(scanAll, result); + snapshot.put(scanAll, Collections.singletonMap(key, result)); // Act Assert Throwable thrown = catchThrowable(() -> snapshot.verify(scanAll)); @@ -1446,8 +1470,9 @@ public void verify_ScanAllGivenAndPutInWriteSetInSameTable_ShouldThrowException( .withConsistency(Consistency.LINEARIZABLE) .forNamespace(ANY_NAMESPACE_NAME_2) .forTable(ANY_TABLE_NAME_2); - Snapshot.Key key = new Snapshot.Key(scanAll, prepareResult(ANY_ID)); - snapshot.put(scanAll, Collections.singletonList(key)); + TransactionResult result = prepareResult(ANY_ID); + Snapshot.Key key = new Snapshot.Key(scanAll, result); + snapshot.put(scanAll, Collections.singletonMap(key, result)); // Act Assert Throwable thrown = catchThrowable(() -> snapshot.verify(scanAll)); @@ -1456,6 +1481,22 @@ public void verify_ScanAllGivenAndPutInWriteSetInSameTable_ShouldThrowException( assertThat(thrown).doesNotThrowAnyException(); } + @Test + public void get_GetGivenAndAlreadyPresentInGetSet_ShouldReturnResult() { + // Arrange + snapshot = prepareSnapshot(Isolation.SNAPSHOT); + Get get = prepareGet(); + TransactionResult expected = prepareResult(ANY_ID); + snapshot.put(get, Optional.of(expected)); + + // Act + Optional actual = snapshot.get(get); + + // Assert + assertThat(actual).isPresent(); + assertThat(actual.get()).isEqualTo(expected); + } + @Test public void get_ScanAllGivenAndAlreadyPresentInScanSet_ShouldReturnKeys() { // Arrange @@ -1470,15 +1511,17 @@ public void get_ScanAllGivenAndAlreadyPresentInScanSet_ShouldReturnKeys() { .withConsistency(Consistency.LINEARIZABLE) .forNamespace(ANY_NAMESPACE_NAME_2) .forTable(ANY_TABLE_NAME_2); - Snapshot.Key aKey = mock(Snapshot.Key.class); - snapshot.put(scanAll, Collections.singletonList(aKey)); + TransactionResult result = prepareResult(ANY_ID); + Snapshot.Key key = new Snapshot.Key(scanAll, result); + snapshot.put(scanAll, Collections.singletonMap(key, result)); // Act Assert - Optional> keys = snapshot.get(scanAll); + Optional> results = snapshot.get(scanAll); // Assert - assertThat(keys).isNotEmpty(); - assertThat(keys.get()).containsExactly(aKey); + assertThat(results).isNotEmpty(); + assertThat(results.get()).containsKey(key); + assertThat(results.get().get(key)).isEqualTo(result); } @Test @@ -1489,8 +1532,9 @@ public void verify_RelationalScanGivenAndPutInSameTable_ShouldThrowException() { Snapshot.Key putKey = new Snapshot.Key(put); snapshot.put(putKey, put); Scan scan = prepareRelationalScan(); - Snapshot.Key key = new Snapshot.Key(scan, prepareResult(ANY_ID)); - snapshot.put(scan, Collections.singletonList(key)); + TransactionResult result = prepareResult(ANY_ID); + Snapshot.Key key = new Snapshot.Key(scan, result); + snapshot.put(scan, Collections.singletonMap(key, result)); // Act Throwable thrown = catchThrowable(() -> snapshot.verify(scan)); @@ -1507,8 +1551,9 @@ public void verify_RelationalScanGivenAndPutInDifferentNamespace_ShouldNotThrowE Snapshot.Key putKey = new Snapshot.Key(put); snapshot.put(putKey, put); Scan scan = prepareRelationalScan(ANY_NAMESPACE_NAME_2, ANY_TABLE_NAME); - Snapshot.Key key = new Snapshot.Key(scan, prepareResult(ANY_ID)); - snapshot.put(scan, Collections.singletonList(key)); + TransactionResult result = prepareResult(ANY_ID); + Snapshot.Key key = new Snapshot.Key(scan, result); + snapshot.put(scan, Collections.singletonMap(key, result)); // Act Throwable thrown = catchThrowable(() -> snapshot.verify(scan)); @@ -1525,8 +1570,9 @@ public void verify_RelationalScanGivenAndPutInDifferentTable_ShouldNotThrowExcep Snapshot.Key putKey = new Snapshot.Key(put); snapshot.put(putKey, put); Scan scan = prepareRelationalScan(ANY_NAMESPACE_NAME, ANY_TABLE_NAME_2); - Snapshot.Key key = new Snapshot.Key(scan, prepareResult(ANY_ID)); - snapshot.put(scan, Collections.singletonList(key)); + TransactionResult result = prepareResult(ANY_ID); + Snapshot.Key key = new Snapshot.Key(scan, result); + snapshot.put(scan, Collections.singletonMap(key, result)); // Act Throwable thrown = catchThrowable(() -> snapshot.verify(scan)); @@ -1560,7 +1606,7 @@ public void verify_RelationalScanGivenAndPutInDifferentTable_ShouldNotThrowExcep ConditionBuilder.column(ANY_NAME_8).isNullInt())) .build()) .build(); - snapshot.put(scan, Collections.emptyList()); + snapshot.put(scan, Collections.emptyMap()); // Act Throwable thrown = catchThrowable(() -> snapshot.verify(scan)); @@ -1583,7 +1629,7 @@ public void verify_RelationalScanGivenAndPutInDifferentTable_ShouldNotThrowExcep .where(ConditionBuilder.column(ANY_NAME_3).isEqualToText(ANY_TEXT_1)) .or(ConditionBuilder.column(ANY_NAME_4).isEqualToText(ANY_TEXT_4)) .build(); - snapshot.put(scan, Collections.emptyList()); + snapshot.put(scan, Collections.emptyMap()); // Act Throwable thrown = catchThrowable(() -> snapshot.verify(scan)); @@ -1606,7 +1652,7 @@ public void verify_RelationalScanGivenAndPutInDifferentTable_ShouldNotThrowExcep .where(ConditionBuilder.column(ANY_NAME_4).isEqualToText(ANY_TEXT_1)) .or(ConditionBuilder.column(ANY_NAME_5).isEqualToText(ANY_TEXT_1)) .build(); - snapshot.put(scan, Collections.emptyList()); + snapshot.put(scan, Collections.emptyMap()); // Act Throwable thrown = catchThrowable(() -> snapshot.verify(scan)); @@ -1624,7 +1670,7 @@ public void verify_RelationalScanGivenAndPutInDifferentTable_ShouldNotThrowExcep Snapshot.Key putKey = new Snapshot.Key(put); snapshot.put(putKey, put); Scan scan = Scan.newBuilder(prepareRelationalScan()).clearConditions().build(); - snapshot.put(scan, Collections.emptyList()); + snapshot.put(scan, Collections.emptyMap()); // Act Throwable thrown = catchThrowable(() -> snapshot.verify(scan)); diff --git a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitNullMetadataIntegrationTestBase.java b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitNullMetadataIntegrationTestBase.java index cace63fd9b..6757612a5e 100644 --- a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitNullMetadataIntegrationTestBase.java +++ b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitNullMetadataIntegrationTestBase.java @@ -1195,7 +1195,7 @@ public void get_GetGivenForDeletedWhenCoordinatorStateNotExistAndExpired_ShouldA } @Test - public void getAndScan_CommitHappenedInBetween_ShouldReadRepeatably() + public void getThenScanAndGet_CommitHappenedInBetween_OnlyGetShouldReadRepeatably() throws TransactionException, ExecutionException { // Arrange populateRecordsWithNullMetadata(namespace1, TABLE_1); @@ -1215,7 +1215,8 @@ public void getAndScan_CommitHappenedInBetween_ShouldReadRepeatably() // Assert assertThat(result1).isPresent(); - assertThat(result1.get()).isEqualTo(result2); + assertThat(result1.get()).isNotEqualTo(result2); + assertThat(result2.getInt(BALANCE)).isEqualTo(2); assertThat(result1).isEqualTo(result3); } diff --git a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java index 18f30cdd79..c1a5cf4503 100644 --- a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java +++ b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java @@ -1025,7 +1025,7 @@ public void get_GetGivenForDeletedWhenCoordinatorStateNotExistAndExpired_ShouldA } @Test - public void getAndScan_CommitHappenedInBetween_ShouldReadRepeatably() + public void getThenScanAndGet_CommitHappenedInBetween_OnlyGetShouldReadRepeatably() throws TransactionException { // Arrange DistributedTransaction transaction = manager.begin(); @@ -1047,7 +1047,8 @@ public void getAndScan_CommitHappenedInBetween_ShouldReadRepeatably() // Assert assertThat(result1).isPresent(); - assertThat(result1.get()).isEqualTo(result2); + assertThat(result1.get()).isNotEqualTo(result2); + assertThat(result2.getInt(BALANCE)).isEqualTo(2); assertThat(result1).isEqualTo(result3); } @@ -2473,6 +2474,8 @@ public void scan_DeleteGivenBefore_ShouldScan() throws TransactionException { // Assert assertThat(results.size()).isEqualTo(1); + assertThat(results.get(0).getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(results.get(0).getInt(ACCOUNT_TYPE)).isEqualTo(1); } @Test @@ -2706,6 +2709,96 @@ public void scanAll_ScanAllGivenForPreparedWhenCoordinatorStateCommitted_ShouldR scanAll); } + @Test + public void scan_CalledTwice_ShouldReturnFromSnapshotInSecondTime() + throws TransactionException, ExecutionException { + // Arrange + populateRecords(namespace1, TABLE_1); + DistributedTransaction transaction = manager.begin(); + Scan scan = prepareScan(0, 0, 0, namespace1, TABLE_1); + + // Act + List result1 = transaction.scan(scan); + List result2 = transaction.scan(scan); + transaction.commit(); + + // Assert + verify(storage).scan(any(Scan.class)); + assertThat(result1).isEqualTo(result2); + } + + @Test + public void scan_CalledTwiceWithSameConditionsAndDeleteHappenedInBetween_ShouldReadRepeatably() + throws TransactionException { + // Arrange + DistributedTransaction transaction = manager.begin(); + transaction.put(preparePut(0, 0, namespace1, TABLE_1)); + transaction.commit(); + + DistributedTransaction transaction1 = manager.begin(); + Scan scan = + Scan.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .start(Key.ofInt(ACCOUNT_TYPE, 0)) + .build(); + List result1 = transaction1.scan(scan); + + DistributedTransaction transaction2 = manager.begin(); + transaction2.get(prepareGet(0, 0, namespace1, TABLE_1)); + transaction2.delete(prepareDelete(0, 0, namespace1, TABLE_1)); + transaction2.commit(); + + // Act + List result2 = transaction1.scan(scan); + transaction1.commit(); + + // Assert + assertThat(result1.size()).isEqualTo(1); + assertThat(result2.size()).isEqualTo(1); + assertThat(result1.get(0)).isEqualTo(result2.get(0)); + } + + @Test + public void + scan_CalledTwiceWithDifferentConditionsAndInsertHappenedInBetween_ShouldNotReadRepeatably() + throws TransactionException { + // Arrange + DistributedTransaction transaction = manager.begin(); + transaction.put(preparePut(0, 0, namespace1, TABLE_1).withValue(BALANCE, 1)); + transaction.commit(); + + DistributedTransaction transaction1 = manager.begin(); + Scan scan1 = + Scan.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .end(Key.ofInt(ACCOUNT_TYPE, 2)) + .build(); + Scan scan2 = + Scan.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .end(Key.ofInt(ACCOUNT_TYPE, 3)) + .build(); + List result1 = transaction1.scan(scan1); + + DistributedTransaction transaction2 = manager.begin(); + transaction2.put(preparePut(0, 1, namespace1, TABLE_1)); + transaction2.commit(); + + // Act + List result2 = transaction1.scan(scan2); + transaction1.commit(); + + // Assert + assertThat(result1.size()).isEqualTo(1); + assertThat(result2.size()).isEqualTo(2); + } + private DistributedTransaction prepareTransfer( int fromId, String fromNamespace, diff --git a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitSpecificIntegrationTestBase.java b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitSpecificIntegrationTestBase.java index e410517d0a..de6eb1ccb9 100644 --- a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitSpecificIntegrationTestBase.java +++ b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitSpecificIntegrationTestBase.java @@ -1016,7 +1016,7 @@ public void get_GetGivenForDeletedWhenCoordinatorStateNotExistAndExpired_ShouldA } @Test - public void getAndScan_CommitHappenedInBetween_ShouldReadRepeatably() + public void getThenScanAndGet_CommitHappenedInBetween_OnlyGetShouldReadRepeatably() throws TransactionException { // Arrange TwoPhaseCommitTransaction transaction = manager1.begin(); @@ -1041,7 +1041,8 @@ public void getAndScan_CommitHappenedInBetween_ShouldReadRepeatably() // Assert assertThat(result1).isPresent(); - assertThat(result1.get()).isEqualTo(result2); + assertThat(result1.get()).isNotEqualTo(result2); + assertThat(result2.getInt(BALANCE)).isEqualTo(2); assertThat(result1).isEqualTo(result3); }