diff --git a/core/src/main/java/com/scalar/db/api/CrudOperable.java b/core/src/main/java/com/scalar/db/api/CrudOperable.java new file mode 100644 index 0000000000..a773f3e93a --- /dev/null +++ b/core/src/main/java/com/scalar/db/api/CrudOperable.java @@ -0,0 +1,134 @@ +package com.scalar.db.api; + +import com.scalar.db.exception.transaction.CommitConflictException; +import com.scalar.db.exception.transaction.CrudConflictException; +import com.scalar.db.exception.transaction.PreparationConflictException; +import com.scalar.db.exception.transaction.TransactionException; +import com.scalar.db.exception.transaction.UnsatisfiedConditionException; +import java.util.List; +import java.util.Optional; + +/** + * An interface for transactional CRUD operations. Note that the LINEARIZABLE consistency level is + * always used in transactional CRUD operations, so {@link Consistency} specified for CRUD + * operations is ignored. + */ +public interface CrudOperable { + + /** + * Retrieves a result from the storage through a transaction with the specified {@link Get} + * command with a primary key and returns the result. + * + * @param get a {@code Get} command + * @return an {@code Optional} with the returned result + * @throws E if the transaction CRUD operation fails + */ + Optional get(Get get) throws E; + + /** + * Retrieves results from the storage through a transaction with the specified {@link Scan} + * command with a partition key and returns a list of {@link Result}. Results can be filtered by + * specifying a range of clustering keys. + * + * @param scan a {@code Scan} command + * @return a list of {@link Result} + * @throws E if the transaction CRUD operation fails + */ + List scan(Scan scan) throws E; + + /** + * Inserts an entry into or updates an entry in the underlying storage through a transaction with + * the specified {@link Put} command. If a condition is specified in the {@link Put} command, and + * if the condition is not satisfied or the entry does not exist, it throws {@link + * UnsatisfiedConditionException}. + * + * @param put a {@code Put} command + * @throws E if the transaction CRUD operation fails + * @deprecated As of release 3.13.0. Will be removed in release 5.0.0. + */ + @Deprecated + void put(Put put) throws E; + + /** + * Inserts multiple entries into or updates multiple entries in the underlying storage through a + * transaction with the specified list of {@link Put} commands. If a condition is specified in the + * {@link Put} command, and if the condition is not satisfied or the entry does not exist, it + * throws {@link UnsatisfiedConditionException}. + * + * @param puts a list of {@code Put} commands + * @throws E if the transaction CRUD operation fails + * @deprecated As of release 3.13.0. Will be removed in release 5.0.0. Use {@link #mutate(List)} + * instead. + */ + @Deprecated + void put(List puts) throws E; + + /** + * Inserts an entry into the underlying storage through a transaction with the specified {@link + * Insert} command. If the entry already exists, a conflict error occurs. Note that the location + * where the conflict error is thrown depends on the implementation of the transaction manager. + * This method may throw {@link CrudConflictException}. Alternatively, {@link + * DistributedTransaction#commit()} or {@link TwoPhaseCommitTransaction#prepare()} may throw + * {@link CommitConflictException} or {@link PreparationConflictException} respectively in case of + * a conflict error. + * + * @param insert a {@code Insert} command + * @throws E if the transaction CRUD operation fails + */ + void insert(Insert insert) throws E; + + /** + * Inserts an entry into or updates an entry in the underlying storage through a transaction with + * the specified {@link Upsert} command. If the entry already exists, it is updated; otherwise, it + * is inserted. + * + * @param upsert a {@code Upsert} command + * @throws E if the transaction CRUD operation fails + */ + void upsert(Upsert upsert) throws E; + + /** + * Updates an entry in the underlying storage through a transaction with the specified {@link + * Update} command. If the entry does not exist, it does nothing. If a condition is specified in + * the {@link Update} command, and if the condition is not satisfied or the entry does not exist, + * it throws {@link UnsatisfiedConditionException}. + * + * @param update an {@code Update} command + * @throws E if the transaction CRUD operation fails + */ + void update(Update update) throws E; + + /** + * Deletes an entry from the underlying storage through a transaction with the specified {@link + * Delete} command. If a condition is specified in the {@link Delete} command, and if the + * condition is not satisfied or the entry does not exist, it throws {@link + * UnsatisfiedConditionException}. + * + * @param delete a {@code Delete} command + * @throws E if the transaction CRUD operation fails + */ + void delete(Delete delete) throws E; + + /** + * Deletes entries from the underlying storage through a transaction with the specified list of + * {@link Delete} commands. If a condition is specified in the {@link Delete} command, and if the + * condition is not satisfied or the entry does not exist, it throws {@link + * UnsatisfiedConditionException}. + * + * @param deletes a list of {@code Delete} commands + * @throws E if the transaction CRUD operation fails + * @deprecated As of release 3.13.0. Will be removed in release 5.0.0. Use {@link #mutate(List)} + * instead. + */ + @Deprecated + void delete(List deletes) throws E; + + /** + * Mutates entries of the underlying storage through a transaction with the specified list of + * {@link Mutation} commands. + * + * @param mutations a list of {@code Mutation} commands + * @throws E if the transaction CRUD operation fails + */ + void mutate(List mutations) throws E; +} diff --git a/core/src/main/java/com/scalar/db/api/DistributedTransactionManager.java b/core/src/main/java/com/scalar/db/api/DistributedTransactionManager.java index e5c8c4d2ed..f9c84cfacf 100644 --- a/core/src/main/java/com/scalar/db/api/DistributedTransactionManager.java +++ b/core/src/main/java/com/scalar/db/api/DistributedTransactionManager.java @@ -4,7 +4,8 @@ import com.scalar.db.exception.transaction.TransactionNotFoundException; import java.util.Optional; -public interface DistributedTransactionManager { +public interface DistributedTransactionManager + extends TransactionManagerCrudOperable, AutoCloseable { /** * Sets the specified namespace and the table name as default values in the instance. @@ -251,5 +252,6 @@ default TransactionState abort(String txId) throws TransactionException { * Closes connections to the cluster. The connections are shared among multiple services such as * StorageService and TransactionService, thus this should only be used when closing applications. */ + @Override void close(); } diff --git a/core/src/main/java/com/scalar/db/api/TransactionCrudOperable.java b/core/src/main/java/com/scalar/db/api/TransactionCrudOperable.java index 224e5f2305..c8303f7a90 100644 --- a/core/src/main/java/com/scalar/db/api/TransactionCrudOperable.java +++ b/core/src/main/java/com/scalar/db/api/TransactionCrudOperable.java @@ -1,56 +1,41 @@ package com.scalar.db.api; -import com.scalar.db.exception.transaction.CommitConflictException; import com.scalar.db.exception.transaction.CrudConflictException; import com.scalar.db.exception.transaction.CrudException; -import com.scalar.db.exception.transaction.PreparationConflictException; import com.scalar.db.exception.transaction.UnsatisfiedConditionException; import java.util.List; import java.util.Optional; -/** - * An interface for transactional CRUD operations. Note that the LINEARIZABLE consistency level is - * always used in transactional CRUD operations, so {@link Consistency} specified for CRUD - * operations is ignored. - */ -public interface TransactionCrudOperable { +/** An interface for transactional CRUD operations for transactions. */ +public interface TransactionCrudOperable extends CrudOperable { /** - * Retrieves a result from the storage through a transaction with the specified {@link Get} - * command with a primary key and returns the result. + * {@inheritDoc} * - * @param get a {@code Get} command - * @return an {@code Optional} with the returned result * @throws CrudConflictException if the transaction CRUD operation fails due to transient faults * (e.g., a conflict error). You can retry the transaction from the beginning * @throws CrudException if the transaction CRUD operation fails due to transient or nontransient * faults. You can try retrying the transaction from the beginning, but the transaction may * still fail if the cause is nontranient */ + @Override Optional get(Get get) throws CrudConflictException, CrudException; /** - * Retrieves results from the storage through a transaction with the specified {@link Scan} - * command with a partition key and returns a list of {@link Result}. Results can be filtered by - * specifying a range of clustering keys. + * {@inheritDoc} * - * @param scan a {@code Scan} command - * @return a list of {@link Result} * @throws CrudConflictException if the transaction CRUD operation fails due to transient faults * (e.g., a conflict error). You can retry the transaction from the beginning * @throws CrudException if the transaction CRUD operation fails due to transient or nontransient * faults. You can try retrying the transaction from the beginning, but the transaction may * still fail if the cause is nontranient */ + @Override List scan(Scan scan) throws CrudConflictException, CrudException; /** - * Inserts an entry into or updates an entry in the underlying storage through a transaction with - * the specified {@link Put} command. If a condition is specified in the {@link Put} command, and - * if the condition is not satisfied or the entry does not exist, it throws {@link - * UnsatisfiedConditionException}. + * {@inheritDoc} * - * @param put a {@code Put} command * @throws CrudConflictException if the transaction CRUD operation fails due to transient faults * (e.g., a conflict error). You can retry the transaction from the beginning * @throws CrudException if the transaction CRUD operation fails due to transient or nontransient @@ -61,15 +46,12 @@ public interface TransactionCrudOperable { * @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */ @Deprecated + @Override void put(Put put) throws CrudConflictException, CrudException, UnsatisfiedConditionException; /** - * Inserts multiple entries into or updates multiple entries in the underlying storage through a - * transaction with the specified list of {@link Put} commands. If a condition is specified in the - * {@link Put} command, and if the condition is not satisfied or the entry does not exist, it - * throws {@link UnsatisfiedConditionException}. + * {@inheritDoc} * - * @param puts a list of {@code Put} commands * @throws CrudConflictException if the transaction CRUD operation fails due to transient faults * (e.g., a conflict error). You can retry the transaction from the beginning * @throws CrudException if the transaction CRUD operation fails due to transient or nontransient @@ -81,87 +63,67 @@ public interface TransactionCrudOperable { * instead. */ @Deprecated + @Override void put(List puts) throws CrudConflictException, CrudException, UnsatisfiedConditionException; /** - * Deletes an entry from the underlying storage through a transaction with the specified {@link - * Delete} command. If a condition is specified in the {@link Delete} command, and if the - * condition is not satisfied or the entry does not exist, it throws {@link - * UnsatisfiedConditionException}. + * {@inheritDoc} * - * @param delete a {@code Delete} command * @throws CrudConflictException if the transaction CRUD operation fails due to transient faults * (e.g., a conflict error). You can retry the transaction from the beginning * @throws CrudException if the transaction CRUD operation fails due to transient or nontransient * faults. You can try retrying the transaction from the beginning, but the transaction may * still fail if the cause is nontranient - * @throws UnsatisfiedConditionException if a condition is specified, and if the condition is not - * satisfied or the entry does not exist */ - void delete(Delete delete) - throws CrudConflictException, CrudException, UnsatisfiedConditionException; + @Override + void insert(Insert insert) throws CrudConflictException, CrudException; /** - * Deletes entries from the underlying storage through a transaction with the specified list of - * {@link Delete} commands. If a condition is specified in the {@link Delete} command, and if the - * condition is not satisfied or the entry does not exist, it throws {@link - * UnsatisfiedConditionException}. + * {@inheritDoc} * - * @param deletes a list of {@code Delete} commands * @throws CrudConflictException if the transaction CRUD operation fails due to transient faults * (e.g., a conflict error). You can retry the transaction from the beginning * @throws CrudException if the transaction CRUD operation fails due to transient or nontransient * faults. You can try retrying the transaction from the beginning, but the transaction may * still fail if the cause is nontranient - * @throws UnsatisfiedConditionException if a condition is specified, and if the condition is not - * satisfied or the entry does not exist - * @deprecated As of release 3.13.0. Will be removed in release 5.0.0. Use {@link #mutate(List)} - * instead. */ - @Deprecated - void delete(List deletes) - throws CrudConflictException, CrudException, UnsatisfiedConditionException; + @Override + void upsert(Upsert upsert) throws CrudConflictException, CrudException; /** - * Inserts an entry into the underlying storage through a transaction with the specified {@link - * Insert} command. If the entry already exists, a conflict error occurs. Note that the location - * where the conflict error is thrown depends on the implementation of the transaction manager. - * This method may throw {@link CrudConflictException}. Alternatively, {@link - * DistributedTransaction#commit()} or {@link TwoPhaseCommitTransaction#prepare()} may throw - * {@link CommitConflictException} or {@link PreparationConflictException} respectively in case of - * a conflict error. + * {@inheritDoc} * - * @param insert a {@code Insert} command * @throws CrudConflictException if the transaction CRUD operation fails due to transient faults * (e.g., a conflict error). You can retry the transaction from the beginning * @throws CrudException if the transaction CRUD operation fails due to transient or nontransient * faults. You can try retrying the transaction from the beginning, but the transaction may * still fail if the cause is nontranient + * @throws UnsatisfiedConditionException if a condition is specified, and if the condition is not + * satisfied or the entry does not exist */ - void insert(Insert insert) throws CrudConflictException, CrudException; + @Override + void update(Update update) + throws CrudConflictException, CrudException, UnsatisfiedConditionException; /** - * Inserts an entry into or updates an entry in the underlying storage through a transaction with - * the specified {@link Upsert} command. If the entry already exists, it is updated; otherwise, it - * is inserted. + * {@inheritDoc} * - * @param upsert a {@code Upsert} command * @throws CrudConflictException if the transaction CRUD operation fails due to transient faults * (e.g., a conflict error). You can retry the transaction from the beginning * @throws CrudException if the transaction CRUD operation fails due to transient or nontransient * faults. You can try retrying the transaction from the beginning, but the transaction may * still fail if the cause is nontranient + * @throws UnsatisfiedConditionException if a condition is specified, and if the condition is not + * satisfied or the entry does not exist */ - void upsert(Upsert upsert) throws CrudConflictException, CrudException; + @Override + void delete(Delete delete) + throws CrudConflictException, CrudException, UnsatisfiedConditionException; /** - * Updates an entry in the underlying storage through a transaction with the specified {@link - * Update} command. If the entry does not exist, it does nothing. If a condition is specified in - * the {@link Update} command, and if the condition is not satisfied or the entry does not exist, - * it throws {@link UnsatisfiedConditionException}. + * {@inheritDoc} * - * @param update an {@code Update} command * @throws CrudConflictException if the transaction CRUD operation fails due to transient faults * (e.g., a conflict error). You can retry the transaction from the beginning * @throws CrudException if the transaction CRUD operation fails due to transient or nontransient @@ -169,15 +131,17 @@ void delete(List deletes) * still fail if the cause is nontranient * @throws UnsatisfiedConditionException if a condition is specified, and if the condition is not * satisfied or the entry does not exist + * @deprecated As of release 3.13.0. Will be removed in release 5.0.0. Use {@link #mutate(List)} + * instead. */ - void update(Update update) + @Deprecated + @Override + void delete(List deletes) throws CrudConflictException, CrudException, UnsatisfiedConditionException; /** - * Mutates entries of the underlying storage through a transaction with the specified list of - * {@link Mutation} commands. + * {@inheritDoc} * - * @param mutations a list of {@code Mutation} commands * @throws CrudConflictException if the transaction CRUD operation fails due to transient faults * (e.g., a conflict error). You can retry the transaction from the beginning * @throws CrudException if the transaction CRUD operation fails due to transient or nontransient @@ -187,6 +151,7 @@ void update(Update update) * Delete}, or {@link Update} command, and if the condition is not satisfied or the entry does * not exist */ + @Override void mutate(List mutations) throws CrudConflictException, CrudException, UnsatisfiedConditionException; } diff --git a/core/src/main/java/com/scalar/db/api/TransactionManagerCrudOperable.java b/core/src/main/java/com/scalar/db/api/TransactionManagerCrudOperable.java new file mode 100644 index 0000000000..eb285a2d54 --- /dev/null +++ b/core/src/main/java/com/scalar/db/api/TransactionManagerCrudOperable.java @@ -0,0 +1,180 @@ +package com.scalar.db.api; + +import com.scalar.db.exception.transaction.CrudConflictException; +import com.scalar.db.exception.transaction.CrudException; +import com.scalar.db.exception.transaction.TransactionException; +import com.scalar.db.exception.transaction.UnknownTransactionStatusException; +import com.scalar.db.exception.transaction.UnsatisfiedConditionException; +import java.util.List; +import java.util.Optional; + +/** An interface for transactional CRUD operations for transaction managers. */ +public interface TransactionManagerCrudOperable extends CrudOperable { + + /** + * {@inheritDoc} + * + * @throws CrudConflictException if the transaction CRUD operation fails due to transient faults + * (e.g., a conflict error). You can retry the transaction from the beginning + * @throws CrudException if the transaction CRUD operation fails due to transient or nontransient + * faults. You can try retrying the transaction from the beginning, but the transaction may + * still fail if the cause is nontranient + * @throws UnknownTransactionStatusException if the status of the commit is unknown + */ + @Override + Optional get(Get get) + throws CrudConflictException, CrudException, UnknownTransactionStatusException; + + /** + * {@inheritDoc} + * + * @throws CrudConflictException if the transaction CRUD operation fails due to transient faults + * (e.g., a conflict error). You can retry the transaction from the beginning + * @throws CrudException if the transaction CRUD operation fails due to transient or nontransient + * faults. You can try retrying the transaction from the beginning, but the transaction may + * still fail if the cause is nontranient + * @throws UnknownTransactionStatusException if the status of the commit is unknown + */ + @Override + List scan(Scan scan) + throws CrudConflictException, CrudException, UnknownTransactionStatusException; + + /** + * {@inheritDoc} + * + * @throws CrudConflictException if the transaction CRUD operation fails due to transient faults + * (e.g., a conflict error). You can retry the transaction from the beginning + * @throws CrudException if the transaction CRUD operation fails due to transient or nontransient + * faults. You can try retrying the transaction from the beginning, but the transaction may + * still fail if the cause is nontranient + * @throws UnsatisfiedConditionException if a condition is specified, and if the condition is not + * satisfied or the entry does not exist + * @throws UnknownTransactionStatusException if the status of the commit is unknown + * @deprecated As of release 3.13.0. Will be removed in release 5.0.0. + */ + @Deprecated + @Override + void put(Put put) + throws CrudConflictException, CrudException, UnsatisfiedConditionException, + UnknownTransactionStatusException; + + /** + * {@inheritDoc} + * + * @throws CrudConflictException if the transaction CRUD operation fails due to transient faults + * (e.g., a conflict error). You can retry the transaction from the beginning + * @throws CrudException if the transaction CRUD operation fails due to transient or nontransient + * faults. You can try retrying the transaction from the beginning, but the transaction may + * still fail if the cause is nontranient + * @throws UnsatisfiedConditionException if a condition is specified, and if the condition is not + * satisfied or the entry does not exist + * @throws UnknownTransactionStatusException if the status of the commit is unknown + * @deprecated As of release 3.13.0. Will be removed in release 5.0.0. Use {@link #mutate(List)} + * instead. + */ + @Deprecated + @Override + void put(List puts) + throws CrudConflictException, CrudException, UnsatisfiedConditionException, + UnknownTransactionStatusException; + + /** + * {@inheritDoc} + * + * @throws CrudConflictException if the transaction CRUD operation fails due to transient faults + * (e.g., a conflict error). You can retry the transaction from the beginning + * @throws CrudException if the transaction CRUD operation fails due to transient or nontransient + * faults. You can try retrying the transaction from the beginning, but the transaction may + * still fail if the cause is nontranient + * @throws UnknownTransactionStatusException if the status of the commit is unknown + */ + @Override + void insert(Insert insert) + throws CrudConflictException, CrudException, UnknownTransactionStatusException; + + /** + * {@inheritDoc} + * + * @throws CrudConflictException if the transaction CRUD operation fails due to transient faults + * (e.g., a conflict error). You can retry the transaction from the beginning + * @throws CrudException if the transaction CRUD operation fails due to transient or nontransient + * faults. You can try retrying the transaction from the beginning, but the transaction may + * still fail if the cause is nontranient + * @throws UnknownTransactionStatusException if the status of the commit is unknown + */ + @Override + void upsert(Upsert upsert) + throws CrudConflictException, CrudException, UnknownTransactionStatusException; + + /** + * {@inheritDoc} + * + * @throws CrudConflictException if the transaction CRUD operation fails due to transient faults + * (e.g., a conflict error). You can retry the transaction from the beginning + * @throws CrudException if the transaction CRUD operation fails due to transient or nontransient + * faults. You can try retrying the transaction from the beginning, but the transaction may + * still fail if the cause is nontranient + * @throws UnsatisfiedConditionException if a condition is specified, and if the condition is not + * satisfied or the entry does not exist + * @throws UnknownTransactionStatusException if the status of the commit is unknown + */ + @Override + void update(Update update) + throws CrudConflictException, CrudException, UnsatisfiedConditionException, + UnknownTransactionStatusException; + + /** + * {@inheritDoc} + * + * @throws CrudConflictException if the transaction CRUD operation fails due to transient faults + * (e.g., a conflict error). You can retry the transaction from the beginning + * @throws CrudException if the transaction CRUD operation fails due to transient or nontransient + * faults. You can try retrying the transaction from the beginning, but the transaction may + * still fail if the cause is nontranient + * @throws UnsatisfiedConditionException if a condition is specified, and if the condition is not + * satisfied or the entry does not exist + * @throws UnknownTransactionStatusException if the status of the commit is unknown + */ + @Override + void delete(Delete delete) + throws CrudConflictException, CrudException, UnsatisfiedConditionException, + UnknownTransactionStatusException; + + /** + * {@inheritDoc} + * + * @throws CrudConflictException if the transaction CRUD operation fails due to transient faults + * (e.g., a conflict error). You can retry the transaction from the beginning + * @throws CrudException if the transaction CRUD operation fails due to transient or nontransient + * faults. You can try retrying the transaction from the beginning, but the transaction may + * still fail if the cause is nontranient + * @throws UnsatisfiedConditionException if a condition is specified, and if the condition is not + * satisfied or the entry does not exist + * @throws UnknownTransactionStatusException if the status of the commit is unknown + * @deprecated As of release 3.13.0. Will be removed in release 5.0.0. Use {@link #mutate(List)} + * instead. + */ + @Deprecated + @Override + void delete(List deletes) + throws CrudConflictException, CrudException, UnsatisfiedConditionException, + UnknownTransactionStatusException; + + /** + * {@inheritDoc} + * + * @throws CrudConflictException if the transaction CRUD operation fails due to transient faults + * (e.g., a conflict error). You can retry the transaction from the beginning + * @throws CrudException if the transaction CRUD operation fails due to transient or nontransient + * faults. You can try retrying the transaction from the beginning, but the transaction may + * still fail if the cause is nontranient + * @throws UnsatisfiedConditionException if a condition is specified in a {@link Put}, {@link + * Delete}, or {@link Update} command, and if the condition is not satisfied or the entry does + * not exist + * @throws UnknownTransactionStatusException if the status of the commit is unknown + */ + @Override + void mutate(List mutations) + throws CrudConflictException, CrudException, UnsatisfiedConditionException, + UnknownTransactionStatusException; +} diff --git a/core/src/main/java/com/scalar/db/api/TwoPhaseCommitTransactionManager.java b/core/src/main/java/com/scalar/db/api/TwoPhaseCommitTransactionManager.java index 3ceb3bc77d..33fe5f680f 100644 --- a/core/src/main/java/com/scalar/db/api/TwoPhaseCommitTransactionManager.java +++ b/core/src/main/java/com/scalar/db/api/TwoPhaseCommitTransactionManager.java @@ -13,7 +13,8 @@ * join the transaction with {@link #join(String)} with the transaction ID. Also, participants can * resume the transaction with {@link #resume(String)} with the transaction ID. */ -public interface TwoPhaseCommitTransactionManager { +public interface TwoPhaseCommitTransactionManager + extends TransactionManagerCrudOperable, AutoCloseable { /** * Sets the specified namespace and the table name as default values in the instance. @@ -179,5 +180,6 @@ default TransactionState abort(String txId) throws TransactionException { * Closes connections to the cluster. The connections are shared among multiple services such as * StorageService and TransactionService, thus this should only be used when closing applications. */ + @Override void close(); } diff --git a/core/src/main/java/com/scalar/db/common/AbstractDistributedTransactionManager.java b/core/src/main/java/com/scalar/db/common/AbstractDistributedTransactionManager.java index db99a77451..a2bde9838b 100644 --- a/core/src/main/java/com/scalar/db/common/AbstractDistributedTransactionManager.java +++ b/core/src/main/java/com/scalar/db/common/AbstractDistributedTransactionManager.java @@ -1,6 +1,5 @@ package com.scalar.db.common; -import com.google.common.annotations.VisibleForTesting; import com.scalar.db.api.Delete; import com.scalar.db.api.DistributedTransaction; import com.scalar.db.api.DistributedTransactionManager; @@ -12,32 +11,33 @@ import com.scalar.db.api.Scan; import com.scalar.db.api.Update; import com.scalar.db.api.Upsert; -import com.scalar.db.common.error.CoreError; import com.scalar.db.config.DatabaseConfig; -import com.scalar.db.exception.transaction.AbortException; -import com.scalar.db.exception.transaction.CommitException; +import com.scalar.db.exception.transaction.CommitConflictException; +import com.scalar.db.exception.transaction.CrudConflictException; import com.scalar.db.exception.transaction.CrudException; import com.scalar.db.exception.transaction.RollbackException; import com.scalar.db.exception.transaction.TransactionException; +import com.scalar.db.exception.transaction.TransactionNotFoundException; import com.scalar.db.exception.transaction.UnknownTransactionStatusException; +import com.scalar.db.util.ScalarDbUtils; +import com.scalar.db.util.ThrowableFunction; import java.util.List; import java.util.Optional; -import java.util.concurrent.CopyOnWriteArrayList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public abstract class AbstractDistributedTransactionManager - implements DistributedTransactionManager, DistributedTransactionDecoratorAddable { + implements DistributedTransactionManager { + + private static final Logger logger = + LoggerFactory.getLogger(AbstractDistributedTransactionManager.class); private Optional namespace; private Optional tableName; - private final List transactionDecorators = - new CopyOnWriteArrayList<>(); - - public AbstractDistributedTransactionManager(DatabaseConfig databaseConfig) { - namespace = databaseConfig.getDefaultNamespaceName(); + public AbstractDistributedTransactionManager(DatabaseConfig config) { + namespace = config.getDefaultNamespaceName(); tableName = Optional.empty(); - - addTransactionDecorator(StateManagedTransaction::new); } /** @deprecated As of release 3.6.0. Will be removed in release 5.0.0 */ @@ -76,151 +76,159 @@ public Optional getTable() { return tableName; } - protected DistributedTransaction decorate(DistributedTransaction transaction) - throws TransactionException { - DistributedTransaction decorated = transaction; - for (DistributedTransactionDecorator transactionDecorator : transactionDecorators) { - decorated = transactionDecorator.decorate(decorated); - } - return decorated; + @Override + public Optional get(Get get) throws CrudException, UnknownTransactionStatusException { + return executeTransaction(t -> t.get(copyAndSetTargetToIfNot(get))); } @Override - public void addTransactionDecorator(DistributedTransactionDecorator transactionDecorator) { - transactionDecorators.add(transactionDecorator); - } - - /** - * This class is to unify the call sequence of the transaction object. It doesn't care about the - * potential inconsistency between the status field on JVM memory and the underlying persistent - * layer. - */ - @VisibleForTesting - static class StateManagedTransaction extends DecoratedDistributedTransaction { - - private enum Status { - ACTIVE, - COMMITTED, - COMMIT_FAILED, - ROLLED_BACK - } + public List scan(Scan scan) throws CrudException, UnknownTransactionStatusException { + return executeTransaction(t -> t.scan(copyAndSetTargetToIfNot(scan))); + } - private Status status; + @Deprecated + @Override + public void put(Put put) throws CrudException, UnknownTransactionStatusException { + executeTransaction( + t -> { + t.put(copyAndSetTargetToIfNot(put)); + return null; + }); + } - @VisibleForTesting - StateManagedTransaction(DistributedTransaction transaction) { - super(transaction); - status = Status.ACTIVE; - } + @Deprecated + @Override + public void put(List puts) throws CrudException, UnknownTransactionStatusException { + executeTransaction( + t -> { + t.put(copyAndSetTargetToIfNot(puts)); + return null; + }); + } - @Override - public Optional get(Get get) throws CrudException { - checkIfActive(); - return super.get(get); - } + @Override + public void insert(Insert insert) throws CrudException, UnknownTransactionStatusException { + executeTransaction( + t -> { + t.insert(copyAndSetTargetToIfNot(insert)); + return null; + }); + } - @Override - public List scan(Scan scan) throws CrudException { - checkIfActive(); - return super.scan(scan); - } + @Override + public void upsert(Upsert upsert) throws CrudException, UnknownTransactionStatusException { + executeTransaction( + t -> { + t.upsert(copyAndSetTargetToIfNot(upsert)); + return null; + }); + } - /** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */ - @Deprecated - @Override - public void put(Put put) throws CrudException { - checkIfActive(); - super.put(put); - } + @Override + public void update(Update update) throws CrudException, UnknownTransactionStatusException { + executeTransaction( + t -> { + t.update(copyAndSetTargetToIfNot(update)); + return null; + }); + } - /** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */ - @Deprecated - @Override - public void put(List puts) throws CrudException { - checkIfActive(); - super.put(puts); - } + @Override + public void delete(Delete delete) throws CrudException, UnknownTransactionStatusException { + executeTransaction( + t -> { + t.delete(copyAndSetTargetToIfNot(delete)); + return null; + }); + } - @Override - public void delete(Delete delete) throws CrudException { - checkIfActive(); - super.delete(delete); - } + @Deprecated + @Override + public void delete(List deletes) throws CrudException, UnknownTransactionStatusException { + executeTransaction( + t -> { + t.delete(copyAndSetTargetToIfNot(deletes)); + return null; + }); + } - /** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */ - @Deprecated - @Override - public void delete(List deletes) throws CrudException { - checkIfActive(); - super.delete(deletes); - } + @Override + public void mutate(List mutations) + throws CrudException, UnknownTransactionStatusException { + executeTransaction( + t -> { + t.mutate(copyAndSetTargetToIfNot(mutations)); + return null; + }); + } - @Override - public void insert(Insert insert) throws CrudException { - checkIfActive(); - super.insert(insert); + private R executeTransaction( + ThrowableFunction throwableFunction) + throws CrudException, UnknownTransactionStatusException { + DistributedTransaction transaction; + try { + transaction = begin(); + } catch (TransactionNotFoundException e) { + throw new CrudConflictException(e.getMessage(), e, e.getTransactionId().orElse(null)); + } catch (TransactionException e) { + throw new CrudException(e.getMessage(), e, e.getTransactionId().orElse(null)); + } + + try { + R result = throwableFunction.apply(transaction); + transaction.commit(); + return result; + } catch (CrudException e) { + rollbackTransaction(transaction); + throw e; + } catch (CommitConflictException e) { + rollbackTransaction(transaction); + throw new CrudConflictException(e.getMessage(), e, e.getTransactionId().orElse(null)); + } catch (UnknownTransactionStatusException e) { + throw e; + } catch (TransactionException e) { + rollbackTransaction(transaction); + throw new CrudException(e.getMessage(), e, e.getTransactionId().orElse(null)); } + } - @Override - public void upsert(Upsert upsert) throws CrudException { - checkIfActive(); - super.upsert(upsert); + private void rollbackTransaction(DistributedTransaction transaction) { + try { + transaction.rollback(); + } catch (RollbackException e) { + logger.warn("Rolling back the transaction failed", e); } + } - @Override - public void update(Update update) throws CrudException { - checkIfActive(); - super.update(update); - } + protected List copyAndSetTargetToIfNot(List mutations) { + return ScalarDbUtils.copyAndSetTargetToIfNot(mutations, namespace, tableName); + } - @Override - public void mutate(List mutations) throws CrudException { - checkIfActive(); - super.mutate(mutations); - } + protected Get copyAndSetTargetToIfNot(Get get) { + return ScalarDbUtils.copyAndSetTargetToIfNot(get, namespace, tableName); + } - @Override - public void commit() throws CommitException, UnknownTransactionStatusException { - checkIfActive(); - try { - super.commit(); - status = Status.COMMITTED; - } catch (Exception e) { - status = Status.COMMIT_FAILED; - throw e; - } - } + protected Scan copyAndSetTargetToIfNot(Scan scan) { + return ScalarDbUtils.copyAndSetTargetToIfNot(scan, namespace, tableName); + } - @Override - public void rollback() throws RollbackException { - if (status == Status.COMMITTED || status == Status.ROLLED_BACK) { - throw new IllegalStateException( - CoreError.TRANSACTION_ALREADY_COMMITTED_OR_ROLLED_BACK.buildMessage(status)); - } - try { - super.rollback(); - } finally { - status = Status.ROLLED_BACK; - } - } + protected Put copyAndSetTargetToIfNot(Put put) { + return ScalarDbUtils.copyAndSetTargetToIfNot(put, namespace, tableName); + } - @Override - public void abort() throws AbortException { - if (status == Status.COMMITTED || status == Status.ROLLED_BACK) { - throw new IllegalStateException( - CoreError.TRANSACTION_ALREADY_COMMITTED_OR_ROLLED_BACK.buildMessage(status)); - } - try { - super.abort(); - } finally { - status = Status.ROLLED_BACK; - } - } + protected Delete copyAndSetTargetToIfNot(Delete delete) { + return ScalarDbUtils.copyAndSetTargetToIfNot(delete, namespace, tableName); + } - private void checkIfActive() { - if (status != Status.ACTIVE) { - throw new IllegalStateException(CoreError.TRANSACTION_NOT_ACTIVE.buildMessage(status)); - } - } + protected Insert copyAndSetTargetToIfNot(Insert insert) { + return ScalarDbUtils.copyAndSetTargetToIfNot(insert, namespace, tableName); + } + + protected Upsert copyAndSetTargetToIfNot(Upsert upsert) { + return ScalarDbUtils.copyAndSetTargetToIfNot(upsert, namespace, tableName); + } + + protected Update copyAndSetTargetToIfNot(Update update) { + return ScalarDbUtils.copyAndSetTargetToIfNot(update, namespace, tableName); } } diff --git a/core/src/main/java/com/scalar/db/common/AbstractTwoPhaseCommitTransactionManager.java b/core/src/main/java/com/scalar/db/common/AbstractTwoPhaseCommitTransactionManager.java index da52107cb7..3cec961087 100644 --- a/core/src/main/java/com/scalar/db/common/AbstractTwoPhaseCommitTransactionManager.java +++ b/core/src/main/java/com/scalar/db/common/AbstractTwoPhaseCommitTransactionManager.java @@ -1,6 +1,5 @@ package com.scalar.db.common; -import com.google.common.annotations.VisibleForTesting; import com.scalar.db.api.Delete; import com.scalar.db.api.Get; import com.scalar.db.api.Insert; @@ -12,34 +11,35 @@ import com.scalar.db.api.TwoPhaseCommitTransactionManager; import com.scalar.db.api.Update; import com.scalar.db.api.Upsert; -import com.scalar.db.common.error.CoreError; import com.scalar.db.config.DatabaseConfig; -import com.scalar.db.exception.transaction.AbortException; -import com.scalar.db.exception.transaction.CommitException; +import com.scalar.db.exception.transaction.CommitConflictException; +import com.scalar.db.exception.transaction.CrudConflictException; import com.scalar.db.exception.transaction.CrudException; -import com.scalar.db.exception.transaction.PreparationException; +import com.scalar.db.exception.transaction.PreparationConflictException; import com.scalar.db.exception.transaction.RollbackException; import com.scalar.db.exception.transaction.TransactionException; +import com.scalar.db.exception.transaction.TransactionNotFoundException; import com.scalar.db.exception.transaction.UnknownTransactionStatusException; -import com.scalar.db.exception.transaction.ValidationException; +import com.scalar.db.exception.transaction.ValidationConflictException; +import com.scalar.db.util.ScalarDbUtils; +import com.scalar.db.util.ThrowableFunction; import java.util.List; import java.util.Optional; -import java.util.concurrent.CopyOnWriteArrayList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public abstract class AbstractTwoPhaseCommitTransactionManager - implements TwoPhaseCommitTransactionManager, TwoPhaseCommitTransactionDecoratorAddable { + implements TwoPhaseCommitTransactionManager { + + private static final Logger logger = + LoggerFactory.getLogger(AbstractTwoPhaseCommitTransactionManager.class); private Optional namespace; private Optional tableName; - private final List transactionDecorators = - new CopyOnWriteArrayList<>(); - public AbstractTwoPhaseCommitTransactionManager(DatabaseConfig config) { namespace = config.getDefaultNamespaceName(); tableName = Optional.empty(); - - addTransactionDecorator(StateManagedTransaction::new); } /** @deprecated As of release 3.6.0. Will be removed in release 5.0.0 */ @@ -78,186 +78,163 @@ public Optional getTable() { return tableName; } - protected TwoPhaseCommitTransaction decorate(TwoPhaseCommitTransaction transaction) - throws TransactionException { - TwoPhaseCommitTransaction decorated = transaction; - for (TwoPhaseCommitTransactionDecorator transactionDecorator : transactionDecorators) { - decorated = transactionDecorator.decorate(decorated); - } - return decorated; + @Override + public Optional get(Get get) throws CrudException, UnknownTransactionStatusException { + return executeTransaction(t -> t.get(copyAndSetTargetToIfNot(get))); } @Override - public void addTransactionDecorator(TwoPhaseCommitTransactionDecorator transactionDecorator) { - transactionDecorators.add(transactionDecorator); + public List scan(Scan scan) throws CrudException, UnknownTransactionStatusException { + return executeTransaction(t -> t.scan(copyAndSetTargetToIfNot(scan))); } - /** - * This class is to unify the call sequence of the transaction object. It doesn't care about the - * potential inconsistency between the status field on JVM memory and the underlying persistent - * layer. - */ - @VisibleForTesting - static class StateManagedTransaction extends DecoratedTwoPhaseCommitTransaction { - - private enum Status { - ACTIVE, - PREPARED, - PREPARE_FAILED, - VALIDATED, - VALIDATION_FAILED, - COMMITTED, - COMMIT_FAILED, - ROLLED_BACK - } - - private Status status; - - @VisibleForTesting - StateManagedTransaction(TwoPhaseCommitTransaction transaction) { - super(transaction); - status = Status.ACTIVE; - } - - @Override - public Optional get(Get get) throws CrudException { - checkIfActive(); - return super.get(get); - } + @Deprecated + @Override + public void put(Put put) throws CrudException, UnknownTransactionStatusException { + executeTransaction( + t -> { + t.put(copyAndSetTargetToIfNot(put)); + return null; + }); + } - @Override - public List scan(Scan scan) throws CrudException { - checkIfActive(); - return super.scan(scan); - } + @Deprecated + @Override + public void put(List puts) throws CrudException, UnknownTransactionStatusException { + executeTransaction( + t -> { + t.put(copyAndSetTargetToIfNot(puts)); + return null; + }); + } - /** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */ - @Deprecated - @Override - public void put(Put put) throws CrudException { - checkIfActive(); - super.put(put); - } + @Override + public void insert(Insert insert) throws CrudException, UnknownTransactionStatusException { + executeTransaction( + t -> { + t.insert(copyAndSetTargetToIfNot(insert)); + return null; + }); + } - /** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */ - @Deprecated - @Override - public void put(List puts) throws CrudException { - checkIfActive(); - super.put(puts); - } + @Override + public void upsert(Upsert upsert) throws CrudException, UnknownTransactionStatusException { + executeTransaction( + t -> { + t.upsert(copyAndSetTargetToIfNot(upsert)); + return null; + }); + } - @Override - public void delete(Delete delete) throws CrudException { - checkIfActive(); - super.delete(delete); - } + @Override + public void update(Update update) throws CrudException, UnknownTransactionStatusException { + executeTransaction( + t -> { + t.update(copyAndSetTargetToIfNot(update)); + return null; + }); + } - /** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */ - @Deprecated - @Override - public void delete(List deletes) throws CrudException { - checkIfActive(); - super.delete(deletes); - } + @Override + public void delete(Delete delete) throws CrudException, UnknownTransactionStatusException { + executeTransaction( + t -> { + t.delete(copyAndSetTargetToIfNot(delete)); + return null; + }); + } - @Override - public void insert(Insert insert) throws CrudException { - checkIfActive(); - super.insert(insert); - } + @Deprecated + @Override + public void delete(List deletes) throws CrudException, UnknownTransactionStatusException { + executeTransaction( + t -> { + t.delete(copyAndSetTargetToIfNot(deletes)); + return null; + }); + } - @Override - public void upsert(Upsert upsert) throws CrudException { - checkIfActive(); - super.upsert(upsert); - } + @Override + public void mutate(List mutations) + throws CrudException, UnknownTransactionStatusException { + executeTransaction( + t -> { + t.mutate(copyAndSetTargetToIfNot(mutations)); + return null; + }); + } - @Override - public void update(Update update) throws CrudException { - checkIfActive(); - super.update(update); + private R executeTransaction( + ThrowableFunction throwableFunction) + throws CrudException, UnknownTransactionStatusException { + TwoPhaseCommitTransaction transaction; + try { + transaction = begin(); + } catch (TransactionNotFoundException e) { + throw new CrudConflictException(e.getMessage(), e, e.getTransactionId().orElse(null)); + } catch (TransactionException e) { + throw new CrudException(e.getMessage(), e, e.getTransactionId().orElse(null)); + } + + try { + R result = throwableFunction.apply(transaction); + transaction.prepare(); + transaction.validate(); + transaction.commit(); + return result; + } catch (CrudException e) { + rollbackTransaction(transaction); + throw e; + } catch (PreparationConflictException + | ValidationConflictException + | CommitConflictException e) { + rollbackTransaction(transaction); + throw new CrudConflictException(e.getMessage(), e, e.getTransactionId().orElse(null)); + } catch (UnknownTransactionStatusException e) { + throw e; + } catch (TransactionException e) { + rollbackTransaction(transaction); + throw new CrudException(e.getMessage(), e, e.getTransactionId().orElse(null)); } + } - @Override - public void mutate(List mutations) throws CrudException { - checkIfActive(); - super.mutate(mutations); + private void rollbackTransaction(TwoPhaseCommitTransaction transaction) { + try { + transaction.rollback(); + } catch (RollbackException e) { + logger.warn("Rolling back the transaction failed", e); } + } - @Override - public void prepare() throws PreparationException { - checkIfActive(); - try { - super.prepare(); - status = Status.PREPARED; - } catch (Exception e) { - status = Status.PREPARE_FAILED; - throw e; - } - } + protected List copyAndSetTargetToIfNot(List mutations) { + return ScalarDbUtils.copyAndSetTargetToIfNot(mutations, namespace, tableName); + } - @Override - public void validate() throws ValidationException { - if (status != Status.PREPARED) { - throw new IllegalStateException(CoreError.TRANSACTION_NOT_PREPARED.buildMessage(status)); - } + protected Get copyAndSetTargetToIfNot(Get get) { + return ScalarDbUtils.copyAndSetTargetToIfNot(get, namespace, tableName); + } - try { - super.validate(); - status = Status.VALIDATED; - } catch (Exception e) { - status = Status.VALIDATION_FAILED; - throw e; - } - } + protected Scan copyAndSetTargetToIfNot(Scan scan) { + return ScalarDbUtils.copyAndSetTargetToIfNot(scan, namespace, tableName); + } - @Override - public void commit() throws CommitException, UnknownTransactionStatusException { - if (status != Status.PREPARED && status != Status.VALIDATED) { - throw new IllegalStateException( - CoreError.TRANSACTION_NOT_PREPARED_OR_VALIDATED.buildMessage(status)); - } + protected Put copyAndSetTargetToIfNot(Put put) { + return ScalarDbUtils.copyAndSetTargetToIfNot(put, namespace, tableName); + } - try { - super.commit(); - status = Status.COMMITTED; - } catch (Exception e) { - status = Status.COMMIT_FAILED; - throw e; - } - } + protected Delete copyAndSetTargetToIfNot(Delete delete) { + return ScalarDbUtils.copyAndSetTargetToIfNot(delete, namespace, tableName); + } - @Override - public void rollback() throws RollbackException { - if (status == Status.COMMITTED || status == Status.ROLLED_BACK) { - throw new IllegalStateException( - CoreError.TRANSACTION_ALREADY_COMMITTED_OR_ROLLED_BACK.buildMessage(status)); - } - try { - super.rollback(); - } finally { - status = Status.ROLLED_BACK; - } - } + protected Insert copyAndSetTargetToIfNot(Insert insert) { + return ScalarDbUtils.copyAndSetTargetToIfNot(insert, namespace, tableName); + } - @Override - public void abort() throws AbortException { - if (status == Status.COMMITTED || status == Status.ROLLED_BACK) { - throw new IllegalStateException( - CoreError.TRANSACTION_ALREADY_COMMITTED_OR_ROLLED_BACK.buildMessage(status)); - } - try { - super.abort(); - } finally { - status = Status.ROLLED_BACK; - } - } + protected Upsert copyAndSetTargetToIfNot(Upsert upsert) { + return ScalarDbUtils.copyAndSetTargetToIfNot(upsert, namespace, tableName); + } - private void checkIfActive() { - if (status != Status.ACTIVE) { - throw new IllegalStateException(CoreError.TRANSACTION_NOT_ACTIVE.buildMessage(status)); - } - } + protected Update copyAndSetTargetToIfNot(Update update) { + return ScalarDbUtils.copyAndSetTargetToIfNot(update, namespace, tableName); } } diff --git a/core/src/main/java/com/scalar/db/common/ActiveTransactionManagedDistributedTransactionManager.java b/core/src/main/java/com/scalar/db/common/ActiveTransactionManagedDistributedTransactionManager.java index c1d3598473..b15abc8bcd 100644 --- a/core/src/main/java/com/scalar/db/common/ActiveTransactionManagedDistributedTransactionManager.java +++ b/core/src/main/java/com/scalar/db/common/ActiveTransactionManagedDistributedTransactionManager.java @@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory; public abstract class ActiveTransactionManagedDistributedTransactionManager - extends AbstractDistributedTransactionManager { + extends TransactionDecorationDistributedTransactionManager { private static final long TRANSACTION_EXPIRATION_INTERVAL_MILLIS = 1000; diff --git a/core/src/main/java/com/scalar/db/common/ActiveTransactionManagedTwoPhaseCommitTransactionManager.java b/core/src/main/java/com/scalar/db/common/ActiveTransactionManagedTwoPhaseCommitTransactionManager.java index e107fc5e03..641cf7d3cb 100644 --- a/core/src/main/java/com/scalar/db/common/ActiveTransactionManagedTwoPhaseCommitTransactionManager.java +++ b/core/src/main/java/com/scalar/db/common/ActiveTransactionManagedTwoPhaseCommitTransactionManager.java @@ -31,7 +31,7 @@ import org.slf4j.LoggerFactory; public abstract class ActiveTransactionManagedTwoPhaseCommitTransactionManager - extends AbstractTwoPhaseCommitTransactionManager { + extends TransactionDecorationTwoPhaseCommitTransactionManager { private static final long TRANSACTION_EXPIRATION_INTERVAL_MILLIS = 1000; diff --git a/core/src/main/java/com/scalar/db/common/DecoratedDistributedTransaction.java b/core/src/main/java/com/scalar/db/common/DecoratedDistributedTransaction.java index f222bf8aa7..16308f33b7 100644 --- a/core/src/main/java/com/scalar/db/common/DecoratedDistributedTransaction.java +++ b/core/src/main/java/com/scalar/db/common/DecoratedDistributedTransaction.java @@ -15,6 +15,7 @@ import com.scalar.db.exception.transaction.CrudException; import com.scalar.db.exception.transaction.RollbackException; import com.scalar.db.exception.transaction.UnknownTransactionStatusException; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.List; import java.util.Optional; @@ -22,6 +23,7 @@ public abstract class DecoratedDistributedTransaction implements DistributedTran private final DistributedTransaction decoratedTransaction; + @SuppressFBWarnings("EI_EXPOSE_REP2") public DecoratedDistributedTransaction(DistributedTransaction decoratedTransaction) { this.decoratedTransaction = decoratedTransaction; } @@ -137,6 +139,7 @@ public void abort() throws AbortException { decoratedTransaction.abort(); } + @SuppressFBWarnings("EI_EXPOSE_REP") public DistributedTransaction getOriginalTransaction() { if (decoratedTransaction instanceof DecoratedDistributedTransaction) { return ((DecoratedDistributedTransaction) decoratedTransaction).getOriginalTransaction(); diff --git a/core/src/main/java/com/scalar/db/common/DecoratedTwoPhaseCommitTransaction.java b/core/src/main/java/com/scalar/db/common/DecoratedTwoPhaseCommitTransaction.java index 80edf0616f..982959c376 100644 --- a/core/src/main/java/com/scalar/db/common/DecoratedTwoPhaseCommitTransaction.java +++ b/core/src/main/java/com/scalar/db/common/DecoratedTwoPhaseCommitTransaction.java @@ -17,6 +17,7 @@ import com.scalar.db.exception.transaction.RollbackException; import com.scalar.db.exception.transaction.UnknownTransactionStatusException; import com.scalar.db.exception.transaction.ValidationException; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.List; import java.util.Optional; @@ -24,6 +25,7 @@ public abstract class DecoratedTwoPhaseCommitTransaction implements TwoPhaseComm private final TwoPhaseCommitTransaction decoratedTransaction; + @SuppressFBWarnings("EI_EXPOSE_REP2") public DecoratedTwoPhaseCommitTransaction(TwoPhaseCommitTransaction decoratedTransaction) { this.decoratedTransaction = decoratedTransaction; } @@ -149,6 +151,7 @@ public void abort() throws AbortException { decoratedTransaction.abort(); } + @SuppressFBWarnings("EI_EXPOSE_REP") public TwoPhaseCommitTransaction getOriginalTransaction() { if (decoratedTransaction instanceof DecoratedTwoPhaseCommitTransaction) { return ((DecoratedTwoPhaseCommitTransaction) decoratedTransaction).getOriginalTransaction(); diff --git a/core/src/main/java/com/scalar/db/common/TransactionDecorationDistributedTransactionManager.java b/core/src/main/java/com/scalar/db/common/TransactionDecorationDistributedTransactionManager.java new file mode 100644 index 0000000000..f0cc50d66d --- /dev/null +++ b/core/src/main/java/com/scalar/db/common/TransactionDecorationDistributedTransactionManager.java @@ -0,0 +1,187 @@ +package com.scalar.db.common; + +import com.google.common.annotations.VisibleForTesting; +import com.scalar.db.api.Delete; +import com.scalar.db.api.DistributedTransaction; +import com.scalar.db.api.Get; +import com.scalar.db.api.Insert; +import com.scalar.db.api.Mutation; +import com.scalar.db.api.Put; +import com.scalar.db.api.Result; +import com.scalar.db.api.Scan; +import com.scalar.db.api.Update; +import com.scalar.db.api.Upsert; +import com.scalar.db.common.error.CoreError; +import com.scalar.db.config.DatabaseConfig; +import com.scalar.db.exception.transaction.AbortException; +import com.scalar.db.exception.transaction.CommitException; +import com.scalar.db.exception.transaction.CrudException; +import com.scalar.db.exception.transaction.RollbackException; +import com.scalar.db.exception.transaction.TransactionException; +import com.scalar.db.exception.transaction.UnknownTransactionStatusException; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CopyOnWriteArrayList; + +public abstract class TransactionDecorationDistributedTransactionManager + extends AbstractDistributedTransactionManager + implements DistributedTransactionDecoratorAddable { + + private final List transactionDecorators = + new CopyOnWriteArrayList<>(); + + public TransactionDecorationDistributedTransactionManager(DatabaseConfig config) { + super(config); + + // Add the StateManagedTransaction decorator by default + addTransactionDecorator(StateManagedTransaction::new); + } + + protected DistributedTransaction decorate(DistributedTransaction transaction) + throws TransactionException { + DistributedTransaction decorated = transaction; + for (DistributedTransactionDecorator transactionDecorator : transactionDecorators) { + decorated = transactionDecorator.decorate(decorated); + } + return decorated; + } + + @Override + public void addTransactionDecorator(DistributedTransactionDecorator transactionDecorator) { + transactionDecorators.add(transactionDecorator); + } + + /** + * This class is to unify the call sequence of the transaction object. It doesn't care about the + * potential inconsistency between the status field on JVM memory and the underlying persistent + * layer. + */ + @VisibleForTesting + static class StateManagedTransaction extends DecoratedDistributedTransaction { + + private enum Status { + ACTIVE, + COMMITTED, + COMMIT_FAILED, + ROLLED_BACK + } + + private Status status; + + @VisibleForTesting + StateManagedTransaction(DistributedTransaction transaction) { + super(transaction); + status = Status.ACTIVE; + } + + @Override + public Optional get(Get get) throws CrudException { + checkIfActive(); + return super.get(get); + } + + @Override + public List scan(Scan scan) throws CrudException { + checkIfActive(); + return super.scan(scan); + } + + /** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */ + @Deprecated + @Override + public void put(Put put) throws CrudException { + checkIfActive(); + super.put(put); + } + + /** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */ + @Deprecated + @Override + public void put(List puts) throws CrudException { + checkIfActive(); + super.put(puts); + } + + @Override + public void delete(Delete delete) throws CrudException { + checkIfActive(); + super.delete(delete); + } + + /** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */ + @Deprecated + @Override + public void delete(List deletes) throws CrudException { + checkIfActive(); + super.delete(deletes); + } + + @Override + public void insert(Insert insert) throws CrudException { + checkIfActive(); + super.insert(insert); + } + + @Override + public void upsert(Upsert upsert) throws CrudException { + checkIfActive(); + super.upsert(upsert); + } + + @Override + public void update(Update update) throws CrudException { + checkIfActive(); + super.update(update); + } + + @Override + public void mutate(List mutations) throws CrudException { + checkIfActive(); + super.mutate(mutations); + } + + @Override + public void commit() throws CommitException, UnknownTransactionStatusException { + checkIfActive(); + try { + super.commit(); + status = Status.COMMITTED; + } catch (Exception e) { + status = Status.COMMIT_FAILED; + throw e; + } + } + + @Override + public void rollback() throws RollbackException { + if (status == Status.COMMITTED || status == Status.ROLLED_BACK) { + throw new IllegalStateException( + CoreError.TRANSACTION_ALREADY_COMMITTED_OR_ROLLED_BACK.buildMessage(status)); + } + try { + super.rollback(); + } finally { + status = Status.ROLLED_BACK; + } + } + + @Override + public void abort() throws AbortException { + if (status == Status.COMMITTED || status == Status.ROLLED_BACK) { + throw new IllegalStateException( + CoreError.TRANSACTION_ALREADY_COMMITTED_OR_ROLLED_BACK.buildMessage(status)); + } + try { + super.abort(); + } finally { + status = Status.ROLLED_BACK; + } + } + + private void checkIfActive() { + if (status != Status.ACTIVE) { + throw new IllegalStateException(CoreError.TRANSACTION_NOT_ACTIVE.buildMessage(status)); + } + } + } +} diff --git a/core/src/main/java/com/scalar/db/common/TransactionDecorationTwoPhaseCommitTransactionManager.java b/core/src/main/java/com/scalar/db/common/TransactionDecorationTwoPhaseCommitTransactionManager.java new file mode 100644 index 0000000000..d1463ac447 --- /dev/null +++ b/core/src/main/java/com/scalar/db/common/TransactionDecorationTwoPhaseCommitTransactionManager.java @@ -0,0 +1,224 @@ +package com.scalar.db.common; + +import com.google.common.annotations.VisibleForTesting; +import com.scalar.db.api.Delete; +import com.scalar.db.api.Get; +import com.scalar.db.api.Insert; +import com.scalar.db.api.Mutation; +import com.scalar.db.api.Put; +import com.scalar.db.api.Result; +import com.scalar.db.api.Scan; +import com.scalar.db.api.TwoPhaseCommitTransaction; +import com.scalar.db.api.Update; +import com.scalar.db.api.Upsert; +import com.scalar.db.common.error.CoreError; +import com.scalar.db.config.DatabaseConfig; +import com.scalar.db.exception.transaction.AbortException; +import com.scalar.db.exception.transaction.CommitException; +import com.scalar.db.exception.transaction.CrudException; +import com.scalar.db.exception.transaction.PreparationException; +import com.scalar.db.exception.transaction.RollbackException; +import com.scalar.db.exception.transaction.TransactionException; +import com.scalar.db.exception.transaction.UnknownTransactionStatusException; +import com.scalar.db.exception.transaction.ValidationException; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CopyOnWriteArrayList; + +public abstract class TransactionDecorationTwoPhaseCommitTransactionManager + extends AbstractTwoPhaseCommitTransactionManager + implements TwoPhaseCommitTransactionDecoratorAddable { + + private final List transactionDecorators = + new CopyOnWriteArrayList<>(); + + public TransactionDecorationTwoPhaseCommitTransactionManager(DatabaseConfig config) { + super(config); + + // Add the StateManagedTransaction decorator by default + addTransactionDecorator(StateManagedTransaction::new); + } + + protected TwoPhaseCommitTransaction decorate(TwoPhaseCommitTransaction transaction) + throws TransactionException { + TwoPhaseCommitTransaction decorated = transaction; + for (TwoPhaseCommitTransactionDecorator transactionDecorator : transactionDecorators) { + decorated = transactionDecorator.decorate(decorated); + } + return decorated; + } + + @Override + public void addTransactionDecorator(TwoPhaseCommitTransactionDecorator transactionDecorator) { + transactionDecorators.add(transactionDecorator); + } + + /** + * This class is to unify the call sequence of the transaction object. It doesn't care about the + * potential inconsistency between the status field on JVM memory and the underlying persistent + * layer. + */ + @VisibleForTesting + static class StateManagedTransaction extends DecoratedTwoPhaseCommitTransaction { + + private enum Status { + ACTIVE, + PREPARED, + PREPARE_FAILED, + VALIDATED, + VALIDATION_FAILED, + COMMITTED, + COMMIT_FAILED, + ROLLED_BACK + } + + private Status status; + + @VisibleForTesting + StateManagedTransaction(TwoPhaseCommitTransaction transaction) { + super(transaction); + status = Status.ACTIVE; + } + + @Override + public Optional get(Get get) throws CrudException { + checkIfActive(); + return super.get(get); + } + + @Override + public List scan(Scan scan) throws CrudException { + checkIfActive(); + return super.scan(scan); + } + + /** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */ + @Deprecated + @Override + public void put(Put put) throws CrudException { + checkIfActive(); + super.put(put); + } + + /** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */ + @Deprecated + @Override + public void put(List puts) throws CrudException { + checkIfActive(); + super.put(puts); + } + + @Override + public void delete(Delete delete) throws CrudException { + checkIfActive(); + super.delete(delete); + } + + /** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */ + @Deprecated + @Override + public void delete(List deletes) throws CrudException { + checkIfActive(); + super.delete(deletes); + } + + @Override + public void insert(Insert insert) throws CrudException { + checkIfActive(); + super.insert(insert); + } + + @Override + public void upsert(Upsert upsert) throws CrudException { + checkIfActive(); + super.upsert(upsert); + } + + @Override + public void update(Update update) throws CrudException { + checkIfActive(); + super.update(update); + } + + @Override + public void mutate(List mutations) throws CrudException { + checkIfActive(); + super.mutate(mutations); + } + + @Override + public void prepare() throws PreparationException { + checkIfActive(); + try { + super.prepare(); + status = Status.PREPARED; + } catch (Exception e) { + status = Status.PREPARE_FAILED; + throw e; + } + } + + @Override + public void validate() throws ValidationException { + if (status != Status.PREPARED) { + throw new IllegalStateException(CoreError.TRANSACTION_NOT_PREPARED.buildMessage(status)); + } + + try { + super.validate(); + status = Status.VALIDATED; + } catch (Exception e) { + status = Status.VALIDATION_FAILED; + throw e; + } + } + + @Override + public void commit() throws CommitException, UnknownTransactionStatusException { + if (status != Status.PREPARED && status != Status.VALIDATED) { + throw new IllegalStateException( + CoreError.TRANSACTION_NOT_PREPARED_OR_VALIDATED.buildMessage(status)); + } + + try { + super.commit(); + status = Status.COMMITTED; + } catch (Exception e) { + status = Status.COMMIT_FAILED; + throw e; + } + } + + @Override + public void rollback() throws RollbackException { + if (status == Status.COMMITTED || status == Status.ROLLED_BACK) { + throw new IllegalStateException( + CoreError.TRANSACTION_ALREADY_COMMITTED_OR_ROLLED_BACK.buildMessage(status)); + } + try { + super.rollback(); + } finally { + status = Status.ROLLED_BACK; + } + } + + @Override + public void abort() throws AbortException { + if (status == Status.COMMITTED || status == Status.ROLLED_BACK) { + throw new IllegalStateException( + CoreError.TRANSACTION_ALREADY_COMMITTED_OR_ROLLED_BACK.buildMessage(status)); + } + try { + super.abort(); + } finally { + status = Status.ROLLED_BACK; + } + } + + private void checkIfActive() { + if (status != Status.ACTIVE) { + throw new IllegalStateException(CoreError.TRANSACTION_NOT_ACTIVE.buildMessage(status)); + } + } + } +} diff --git a/core/src/main/java/com/scalar/db/service/TransactionService.java b/core/src/main/java/com/scalar/db/service/TransactionService.java index 27670093a7..fd68d2dc22 100644 --- a/core/src/main/java/com/scalar/db/service/TransactionService.java +++ b/core/src/main/java/com/scalar/db/service/TransactionService.java @@ -1,13 +1,26 @@ package com.scalar.db.service; import com.google.inject.Inject; +import com.scalar.db.api.Delete; import com.scalar.db.api.DistributedTransaction; import com.scalar.db.api.DistributedTransactionManager; +import com.scalar.db.api.Get; +import com.scalar.db.api.Insert; import com.scalar.db.api.Isolation; +import com.scalar.db.api.Mutation; +import com.scalar.db.api.Put; +import com.scalar.db.api.Result; +import com.scalar.db.api.Scan; import com.scalar.db.api.SerializableStrategy; import com.scalar.db.api.TransactionState; +import com.scalar.db.api.Update; +import com.scalar.db.api.Upsert; +import com.scalar.db.exception.transaction.CrudException; import com.scalar.db.exception.transaction.TransactionException; import com.scalar.db.exception.transaction.TransactionNotFoundException; +import com.scalar.db.exception.transaction.UnknownTransactionStatusException; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.List; import java.util.Optional; import javax.annotation.concurrent.ThreadSafe; @@ -17,6 +30,7 @@ public class TransactionService implements DistributedTransactionManager { private final DistributedTransactionManager manager; + @SuppressFBWarnings("EI_EXPOSE_REP2") @Inject public TransactionService(DistributedTransactionManager manager) { this.manager = manager; @@ -143,6 +157,57 @@ public TransactionState abort(String txId) throws TransactionException { return manager.abort(txId); } + @Override + public Optional get(Get get) throws CrudException, UnknownTransactionStatusException { + return manager.get(get); + } + + @Override + public List scan(Scan scan) throws CrudException, UnknownTransactionStatusException { + return manager.scan(scan); + } + + @Override + public void put(Put put) throws CrudException, UnknownTransactionStatusException { + manager.put(put); + } + + @Override + public void put(List puts) throws CrudException, UnknownTransactionStatusException { + manager.put(puts); + } + + @Override + public void insert(Insert insert) throws CrudException, UnknownTransactionStatusException { + manager.insert(insert); + } + + @Override + public void upsert(Upsert upsert) throws CrudException, UnknownTransactionStatusException { + manager.upsert(upsert); + } + + @Override + public void update(Update update) throws CrudException, UnknownTransactionStatusException { + manager.update(update); + } + + @Override + public void delete(Delete delete) throws CrudException, UnknownTransactionStatusException { + manager.delete(delete); + } + + @Override + public void delete(List deletes) throws CrudException, UnknownTransactionStatusException { + manager.delete(deletes); + } + + @Override + public void mutate(List mutations) + throws CrudException, UnknownTransactionStatusException { + manager.mutate(mutations); + } + @Override public void close() { manager.close(); diff --git a/core/src/main/java/com/scalar/db/service/TwoPhaseCommitTransactionService.java b/core/src/main/java/com/scalar/db/service/TwoPhaseCommitTransactionService.java index 3a709a85ef..a2a2439f2c 100644 --- a/core/src/main/java/com/scalar/db/service/TwoPhaseCommitTransactionService.java +++ b/core/src/main/java/com/scalar/db/service/TwoPhaseCommitTransactionService.java @@ -1,11 +1,24 @@ package com.scalar.db.service; import com.google.inject.Inject; +import com.scalar.db.api.Delete; +import com.scalar.db.api.Get; +import com.scalar.db.api.Insert; +import com.scalar.db.api.Mutation; +import com.scalar.db.api.Put; +import com.scalar.db.api.Result; +import com.scalar.db.api.Scan; import com.scalar.db.api.TransactionState; import com.scalar.db.api.TwoPhaseCommitTransaction; import com.scalar.db.api.TwoPhaseCommitTransactionManager; +import com.scalar.db.api.Update; +import com.scalar.db.api.Upsert; +import com.scalar.db.exception.transaction.CrudException; import com.scalar.db.exception.transaction.TransactionException; import com.scalar.db.exception.transaction.TransactionNotFoundException; +import com.scalar.db.exception.transaction.UnknownTransactionStatusException; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.List; import java.util.Optional; import javax.annotation.concurrent.Immutable; @@ -15,6 +28,7 @@ public class TwoPhaseCommitTransactionService implements TwoPhaseCommitTransactionManager { private final TwoPhaseCommitTransactionManager manager; + @SuppressFBWarnings("EI_EXPOSE_REP2") @Inject public TwoPhaseCommitTransactionService(TwoPhaseCommitTransactionManager manager) { this.manager = manager; @@ -100,6 +114,57 @@ public TransactionState abort(String txId) throws TransactionException { return manager.abort(txId); } + @Override + public Optional get(Get get) throws CrudException, UnknownTransactionStatusException { + return manager.get(get); + } + + @Override + public List scan(Scan scan) throws CrudException, UnknownTransactionStatusException { + return manager.scan(scan); + } + + @Override + public void put(Put put) throws CrudException, UnknownTransactionStatusException { + manager.put(put); + } + + @Override + public void put(List puts) throws CrudException, UnknownTransactionStatusException { + manager.put(puts); + } + + @Override + public void insert(Insert insert) throws CrudException, UnknownTransactionStatusException { + manager.insert(insert); + } + + @Override + public void upsert(Upsert upsert) throws CrudException, UnknownTransactionStatusException { + manager.upsert(upsert); + } + + @Override + public void update(Update update) throws CrudException, UnknownTransactionStatusException { + manager.update(update); + } + + @Override + public void delete(Delete delete) throws CrudException, UnknownTransactionStatusException { + manager.delete(delete); + } + + @Override + public void delete(List deletes) throws CrudException, UnknownTransactionStatusException { + manager.delete(deletes); + } + + @Override + public void mutate(List mutations) + throws CrudException, UnknownTransactionStatusException { + manager.mutate(mutations); + } + @Override public void close() { manager.close(); diff --git a/core/src/test/java/com/scalar/db/common/AbstractDistributedTransactionManagerTest.java b/core/src/test/java/com/scalar/db/common/TransactionDecorationDistributedTransactionManagerTest.java similarity index 96% rename from core/src/test/java/com/scalar/db/common/AbstractDistributedTransactionManagerTest.java rename to core/src/test/java/com/scalar/db/common/TransactionDecorationDistributedTransactionManagerTest.java index b766e41613..31356ec7a8 100644 --- a/core/src/test/java/com/scalar/db/common/AbstractDistributedTransactionManagerTest.java +++ b/core/src/test/java/com/scalar/db/common/TransactionDecorationDistributedTransactionManagerTest.java @@ -25,7 +25,7 @@ import org.mockito.Mock; import org.mockito.MockitoAnnotations; -public class AbstractDistributedTransactionManagerTest { +public class TransactionDecorationDistributedTransactionManagerTest { @SuppressFBWarnings("SIC_INNER_SHOULD_BE_STATIC") @SuppressWarnings("ClassCanBeStatic") @@ -34,7 +34,7 @@ public class StateManagedTransactionTest { @Mock private DistributedTransaction wrappedTransaction; - private AbstractDistributedTransactionManager.StateManagedTransaction transaction; + private TransactionDecorationDistributedTransactionManager.StateManagedTransaction transaction; @BeforeEach public void setUp() throws Exception { @@ -42,7 +42,8 @@ public void setUp() throws Exception { // Arrange transaction = - new AbstractDistributedTransactionManager.StateManagedTransaction(wrappedTransaction); + new TransactionDecorationDistributedTransactionManager.StateManagedTransaction( + wrappedTransaction); } @Test diff --git a/core/src/test/java/com/scalar/db/common/AbstractTwoPhaseCommitTransactionManagerTest.java b/core/src/test/java/com/scalar/db/common/TransactionDecorationTwoPhaseCommitTransactionManagerTest.java similarity index 97% rename from core/src/test/java/com/scalar/db/common/AbstractTwoPhaseCommitTransactionManagerTest.java rename to core/src/test/java/com/scalar/db/common/TransactionDecorationTwoPhaseCommitTransactionManagerTest.java index fa0e1dff79..24d2fd012b 100644 --- a/core/src/test/java/com/scalar/db/common/AbstractTwoPhaseCommitTransactionManagerTest.java +++ b/core/src/test/java/com/scalar/db/common/TransactionDecorationTwoPhaseCommitTransactionManagerTest.java @@ -27,7 +27,7 @@ import org.mockito.Mock; import org.mockito.MockitoAnnotations; -public class AbstractTwoPhaseCommitTransactionManagerTest { +public class TransactionDecorationTwoPhaseCommitTransactionManagerTest { @SuppressFBWarnings("SIC_INNER_SHOULD_BE_STATIC") @SuppressWarnings("ClassCanBeStatic") @@ -36,7 +36,8 @@ public class StateManagedTransactionTest { @Mock private TwoPhaseCommitTransaction wrappedTransaction; - private AbstractTwoPhaseCommitTransactionManager.StateManagedTransaction transaction; + private TransactionDecorationTwoPhaseCommitTransactionManager.StateManagedTransaction + transaction; @BeforeEach public void setUp() throws Exception { @@ -44,7 +45,8 @@ public void setUp() throws Exception { // Arrange transaction = - new AbstractTwoPhaseCommitTransactionManager.StateManagedTransaction(wrappedTransaction); + new TransactionDecorationTwoPhaseCommitTransactionManager.StateManagedTransaction( + wrappedTransaction); } @Test diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManagerTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManagerTest.java index b197cab0fd..4ce80a82c1 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManagerTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManagerTest.java @@ -3,21 +3,39 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.scalar.db.api.Delete; import com.scalar.db.api.DistributedStorage; import com.scalar.db.api.DistributedStorageAdmin; import com.scalar.db.api.DistributedTransaction; +import com.scalar.db.api.Get; +import com.scalar.db.api.Insert; +import com.scalar.db.api.Mutation; +import com.scalar.db.api.Put; +import com.scalar.db.api.Result; +import com.scalar.db.api.Scan; import com.scalar.db.api.TransactionState; +import com.scalar.db.api.Update; +import com.scalar.db.api.Upsert; import com.scalar.db.common.DecoratedDistributedTransaction; import com.scalar.db.config.DatabaseConfig; +import com.scalar.db.exception.transaction.CommitConflictException; import com.scalar.db.exception.transaction.CommitException; +import com.scalar.db.exception.transaction.CrudConflictException; +import com.scalar.db.exception.transaction.CrudException; import com.scalar.db.exception.transaction.TransactionException; import com.scalar.db.exception.transaction.TransactionNotFoundException; import com.scalar.db.exception.transaction.UnknownTransactionStatusException; +import com.scalar.db.io.Key; import com.scalar.db.transaction.consensuscommit.Coordinator.State; +import java.util.Arrays; +import java.util.List; import java.util.Optional; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -467,4 +485,414 @@ public void abort_CommitHandlerThrowsUnknownTransactionStatusException_ShouldRet // Assert assertThat(actual).isEqualTo(TransactionState.UNKNOWN); } + + @Test + public void get_ShouldGet() throws TransactionException { + // Arrange + DistributedTransaction transaction = mock(DistributedTransaction.class); + + ConsensusCommitManager spied = spy(manager); + doReturn(transaction).when(spied).begin(); + + Get get = + Get.newBuilder().namespace("ns").table("tbl").partitionKey(Key.ofInt("pk", 0)).build(); + + Result result = mock(Result.class); + when(transaction.get(get)).thenReturn(Optional.of(result)); + + // Act + Optional actual = spied.get(get); + + // Assert + verify(spied).begin(); + verify(transaction).get(get); + verify(transaction).commit(); + assertThat(actual).isEqualTo(Optional.of(result)); + } + + @Test + public void + get_TransactionNotFoundExceptionThrownByTransactionBegin_ShouldThrowCrudConflictException() + throws TransactionException { + // Arrange + ConsensusCommitManager spied = spy(manager); + doThrow(TransactionNotFoundException.class).when(spied).begin(); + + Get get = + Get.newBuilder().namespace("ns").table("tbl").partitionKey(Key.ofInt("pk", 0)).build(); + + // Act Assert + assertThatThrownBy(() -> spied.get(get)).isInstanceOf(CrudConflictException.class); + + verify(spied).begin(); + } + + @Test + public void get_TransactionExceptionThrownByTransactionBegin_ShouldThrowCrudException() + throws TransactionException { + // Arrange + ConsensusCommitManager spied = spy(manager); + doThrow(TransactionException.class).when(spied).begin(); + + Get get = + Get.newBuilder().namespace("ns").table("tbl").partitionKey(Key.ofInt("pk", 0)).build(); + + // Act Assert + assertThatThrownBy(() -> spied.get(get)).isInstanceOf(CrudException.class); + + verify(spied).begin(); + } + + @Test + public void get_CrudExceptionThrownByTransactionGet_ShouldThrowCrudException() + throws TransactionException { + // Arrange + DistributedTransaction transaction = mock(DistributedTransaction.class); + + ConsensusCommitManager spied = spy(manager); + doReturn(transaction).when(spied).begin(); + + Get get = + Get.newBuilder().namespace("ns").table("tbl").partitionKey(Key.ofInt("pk", 0)).build(); + when(transaction.get(get)).thenThrow(CrudException.class); + + // Act Assert + assertThatThrownBy(() -> spied.get(get)).isInstanceOf(CrudException.class); + + verify(spied).begin(); + verify(transaction).get(get); + verify(transaction).rollback(); + } + + @Test + public void + get_CommitConflictExceptionThrownByTransactionCommit_ShouldThrowCrudConflictException() + throws TransactionException { + // Arrange + DistributedTransaction transaction = mock(DistributedTransaction.class); + + ConsensusCommitManager spied = spy(manager); + doReturn(transaction).when(spied).begin(); + + Get get = + Get.newBuilder().namespace("ns").table("tbl").partitionKey(Key.ofInt("pk", 0)).build(); + doThrow(CommitConflictException.class).when(transaction).commit(); + + // Act Assert + assertThatThrownBy(() -> spied.get(get)).isInstanceOf(CrudConflictException.class); + + verify(spied).begin(); + verify(transaction).get(get); + verify(transaction).rollback(); + } + + @Test + public void + get_UnknownTransactionStatusExceptionThrownByTransactionCommit_ShouldThrowUnknownTransactionStatusException() + throws TransactionException { + // Arrange + DistributedTransaction transaction = mock(DistributedTransaction.class); + + ConsensusCommitManager spied = spy(manager); + doReturn(transaction).when(spied).begin(); + + Get get = + Get.newBuilder().namespace("ns").table("tbl").partitionKey(Key.ofInt("pk", 0)).build(); + doThrow(UnknownTransactionStatusException.class).when(transaction).commit(); + + // Act Assert + assertThatThrownBy(() -> spied.get(get)).isInstanceOf(UnknownTransactionStatusException.class); + + verify(spied).begin(); + verify(transaction).get(get); + verify(transaction).commit(); + } + + @Test + public void get_CommitExceptionThrownByTransactionCommit_ShouldThrowUCrudException() + throws TransactionException { + // Arrange + DistributedTransaction transaction = mock(DistributedTransaction.class); + + ConsensusCommitManager spied = spy(manager); + doReturn(transaction).when(spied).begin(); + + Get get = + Get.newBuilder().namespace("ns").table("tbl").partitionKey(Key.ofInt("pk", 0)).build(); + doThrow(CommitException.class).when(transaction).commit(); + + // Act Assert + assertThatThrownBy(() -> spied.get(get)).isInstanceOf(CrudException.class); + + verify(spied).begin(); + verify(transaction).get(get); + verify(transaction).commit(); + } + + @Test + public void scan_ShouldScan() throws TransactionException { + // Arrange + DistributedTransaction transaction = mock(DistributedTransaction.class); + + ConsensusCommitManager spied = spy(manager); + doReturn(transaction).when(spied).begin(); + + Scan scan = + Scan.newBuilder().namespace("ns").table("tbl").partitionKey(Key.ofInt("pk", 0)).build(); + + List results = + Arrays.asList(mock(Result.class), mock(Result.class), mock(Result.class)); + when(transaction.scan(scan)).thenReturn(results); + + // Act + List actual = spied.scan(scan); + + // Assert + verify(spied).begin(); + verify(transaction).scan(scan); + verify(transaction).commit(); + assertThat(actual).isEqualTo(results); + } + + @Test + public void put_ShouldPut() throws TransactionException { + // Arrange + DistributedTransaction transaction = mock(DistributedTransaction.class); + + ConsensusCommitManager spied = spy(manager); + doReturn(transaction).when(spied).begin(); + + Put put = + Put.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofInt("pk", 0)) + .intValue("col", 0) + .build(); + + // Act + spied.put(put); + + // Assert + verify(spied).begin(); + verify(transaction).put(put); + verify(transaction).commit(); + } + + @Test + public void put_MultiplePutsGiven_ShouldPut() throws TransactionException { + // Arrange + DistributedTransaction transaction = mock(DistributedTransaction.class); + + ConsensusCommitManager spied = spy(manager); + doReturn(transaction).when(spied).begin(); + + List puts = + Arrays.asList( + Put.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofInt("pk", 0)) + .intValue("col", 0) + .build(), + Put.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofInt("pk", 1)) + .intValue("col", 1) + .build(), + Put.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofInt("pk", 2)) + .intValue("col", 2) + .build()); + + // Act + spied.put(puts); + + // Assert + verify(spied).begin(); + verify(transaction).put(puts); + verify(transaction).commit(); + } + + @Test + public void insert_ShouldInsert() throws TransactionException { + // Arrange + DistributedTransaction transaction = mock(DistributedTransaction.class); + + ConsensusCommitManager spied = spy(manager); + doReturn(transaction).when(spied).begin(); + + Insert insert = + Insert.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofInt("pk", 0)) + .intValue("col", 0) + .build(); + + // Act + spied.insert(insert); + + // Assert + verify(spied).begin(); + verify(transaction).insert(insert); + verify(transaction).commit(); + } + + @Test + public void upsert_ShouldUpsert() throws TransactionException { + // Arrange + DistributedTransaction transaction = mock(DistributedTransaction.class); + + ConsensusCommitManager spied = spy(manager); + doReturn(transaction).when(spied).begin(); + + Upsert upsert = + Upsert.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofInt("pk", 0)) + .intValue("col", 0) + .build(); + + // Act + spied.upsert(upsert); + + // Assert + verify(spied).begin(); + verify(transaction).upsert(upsert); + verify(transaction).commit(); + } + + @Test + public void update_ShouldUpdate() throws TransactionException { + // Arrange + DistributedTransaction transaction = mock(DistributedTransaction.class); + + ConsensusCommitManager spied = spy(manager); + doReturn(transaction).when(spied).begin(); + + Update update = + Update.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofInt("pk", 0)) + .intValue("col", 0) + .build(); + + // Act + spied.update(update); + + // Assert + verify(spied).begin(); + verify(transaction).update(update); + verify(transaction).commit(); + } + + @Test + public void delete_ShouldDelete() throws TransactionException { + // Arrange + DistributedTransaction transaction = mock(DistributedTransaction.class); + + ConsensusCommitManager spied = spy(manager); + doReturn(transaction).when(spied).begin(); + + Delete delete = + Delete.newBuilder().namespace("ns").table("tbl").partitionKey(Key.ofInt("pk", 0)).build(); + + // Act + spied.delete(delete); + + // Assert + verify(spied).begin(); + verify(transaction).delete(delete); + verify(transaction).commit(); + } + + @Test + public void delete_MultipleDeletesGiven_ShouldDelete() throws TransactionException { + // Arrange + DistributedTransaction transaction = mock(DistributedTransaction.class); + + ConsensusCommitManager spied = spy(manager); + doReturn(transaction).when(spied).begin(); + + List deletes = + Arrays.asList( + Delete.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofInt("pk", 0)) + .build(), + Delete.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofInt("pk", 1)) + .build(), + Delete.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofInt("pk", 2)) + .build()); + + // Act + spied.delete(deletes); + + // Assert + verify(spied).begin(); + verify(transaction).delete(deletes); + verify(transaction).commit(); + } + + @Test + public void mutate_ShouldMutate() throws TransactionException { + // Arrange + DistributedTransaction transaction = mock(DistributedTransaction.class); + + ConsensusCommitManager spied = spy(manager); + doReturn(transaction).when(spied).begin(); + + List mutations = + Arrays.asList( + Put.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofInt("pk", 0)) + .intValue("col", 0) + .build(), + Insert.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofInt("pk", 1)) + .intValue("col", 1) + .build(), + Upsert.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofInt("pk", 2)) + .intValue("col", 2) + .build(), + Update.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofInt("pk", 3)) + .intValue("col", 3) + .build(), + Delete.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofInt("pk", 4)) + .build()); + + // Act + spied.mutate(mutations); + + // Assert + verify(spied).begin(); + verify(transaction).mutate(mutations); + verify(transaction).commit(); + } } diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitManagerTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitManagerTest.java index cfda9509df..cc2d3d17d7 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitManagerTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitManagerTest.java @@ -3,23 +3,43 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.scalar.db.api.Delete; import com.scalar.db.api.DistributedStorage; import com.scalar.db.api.DistributedStorageAdmin; +import com.scalar.db.api.Get; +import com.scalar.db.api.Insert; +import com.scalar.db.api.Mutation; +import com.scalar.db.api.Put; +import com.scalar.db.api.Result; +import com.scalar.db.api.Scan; import com.scalar.db.api.TransactionState; import com.scalar.db.api.TwoPhaseCommitTransaction; +import com.scalar.db.api.TwoPhaseCommitTransactionManager; +import com.scalar.db.api.Update; +import com.scalar.db.api.Upsert; import com.scalar.db.common.DecoratedTwoPhaseCommitTransaction; import com.scalar.db.config.DatabaseConfig; import com.scalar.db.exception.transaction.CommitConflictException; import com.scalar.db.exception.transaction.CommitException; +import com.scalar.db.exception.transaction.CrudConflictException; +import com.scalar.db.exception.transaction.CrudException; +import com.scalar.db.exception.transaction.PreparationConflictException; import com.scalar.db.exception.transaction.RollbackException; import com.scalar.db.exception.transaction.TransactionException; import com.scalar.db.exception.transaction.TransactionNotFoundException; import com.scalar.db.exception.transaction.UnknownTransactionStatusException; +import com.scalar.db.exception.transaction.ValidationConflictException; +import com.scalar.db.io.Key; import com.scalar.db.transaction.consensuscommit.Coordinator.State; +import java.util.Arrays; +import java.util.List; import java.util.Optional; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -442,4 +462,476 @@ public void abort_CommitHandlerThrowsUnknownTransactionStatusException_ShouldRet // Assert assertThat(actual).isEqualTo(TransactionState.UNKNOWN); } + + @Test + public void get_ShouldGet() throws TransactionException { + // Arrange + TwoPhaseCommitTransaction transaction = mock(TwoPhaseCommitTransaction.class); + + TwoPhaseCommitTransactionManager spied = spy(manager); + doReturn(transaction).when(spied).begin(); + + Get get = + Get.newBuilder().namespace("ns").table("tbl").partitionKey(Key.ofInt("pk", 0)).build(); + + Result result = mock(Result.class); + when(transaction.get(get)).thenReturn(Optional.of(result)); + + // Act + Optional actual = spied.get(get); + + // Assert + verify(spied).begin(); + verify(transaction).get(get); + verify(transaction).prepare(); + verify(transaction).validate(); + verify(transaction).commit(); + assertThat(actual).isEqualTo(Optional.of(result)); + } + + @Test + public void + get_TransactionNotFoundExceptionThrownByTransactionBegin_ShouldThrowCrudConflictException() + throws TransactionException { + // Arrange + TwoPhaseCommitTransactionManager spied = spy(manager); + doThrow(TransactionNotFoundException.class).when(spied).begin(); + + Get get = + Get.newBuilder().namespace("ns").table("tbl").partitionKey(Key.ofInt("pk", 0)).build(); + + // Act Assert + assertThatThrownBy(() -> spied.get(get)).isInstanceOf(CrudConflictException.class); + + verify(spied).begin(); + } + + @Test + public void get_TransactionExceptionThrownByTransactionBegin_ShouldThrowCrudException() + throws TransactionException { + // Arrange + TwoPhaseCommitTransactionManager spied = spy(manager); + doThrow(TransactionException.class).when(spied).begin(); + + Get get = + Get.newBuilder().namespace("ns").table("tbl").partitionKey(Key.ofInt("pk", 0)).build(); + + // Act Assert + assertThatThrownBy(() -> spied.get(get)).isInstanceOf(CrudException.class); + + verify(spied).begin(); + } + + @Test + public void get_CrudExceptionThrownByTransactionGet_ShouldThrowCrudException() + throws TransactionException { + // Arrange + TwoPhaseCommitTransaction transaction = mock(TwoPhaseCommitTransaction.class); + + TwoPhaseCommitTransactionManager spied = spy(manager); + doReturn(transaction).when(spied).begin(); + + Get get = + Get.newBuilder().namespace("ns").table("tbl").partitionKey(Key.ofInt("pk", 0)).build(); + when(transaction.get(get)).thenThrow(CrudException.class); + + // Act Assert + assertThatThrownBy(() -> spied.get(get)).isInstanceOf(CrudException.class); + + verify(spied).begin(); + verify(transaction).get(get); + verify(transaction).rollback(); + } + + @Test + public void + get_PreparationConflictExceptionThrownByTransactionPrepare_ShouldThrowCrudConflictException() + throws TransactionException { + // Arrange + TwoPhaseCommitTransaction transaction = mock(TwoPhaseCommitTransaction.class); + + TwoPhaseCommitTransactionManager spied = spy(manager); + doReturn(transaction).when(spied).begin(); + + Get get = + Get.newBuilder().namespace("ns").table("tbl").partitionKey(Key.ofInt("pk", 0)).build(); + doThrow(PreparationConflictException.class).when(transaction).prepare(); + + // Act Assert + assertThatThrownBy(() -> spied.get(get)).isInstanceOf(CrudConflictException.class); + + verify(spied).begin(); + verify(transaction).get(get); + verify(transaction).rollback(); + } + + @Test + public void + get_ValidationConflictExceptionThrownByTransactionValidate_ShouldThrowCrudConflictException() + throws TransactionException { + // Arrange + TwoPhaseCommitTransaction transaction = mock(TwoPhaseCommitTransaction.class); + + TwoPhaseCommitTransactionManager spied = spy(manager); + doReturn(transaction).when(spied).begin(); + + Get get = + Get.newBuilder().namespace("ns").table("tbl").partitionKey(Key.ofInt("pk", 0)).build(); + doThrow(ValidationConflictException.class).when(transaction).validate(); + + // Act Assert + assertThatThrownBy(() -> spied.get(get)).isInstanceOf(CrudConflictException.class); + + verify(spied).begin(); + verify(transaction).get(get); + verify(transaction).rollback(); + } + + @Test + public void + get_CommitConflictExceptionThrownByTransactionCommit_ShouldThrowCrudConflictException() + throws TransactionException { + // Arrange + TwoPhaseCommitTransaction transaction = mock(TwoPhaseCommitTransaction.class); + + TwoPhaseCommitTransactionManager spied = spy(manager); + doReturn(transaction).when(spied).begin(); + + Get get = + Get.newBuilder().namespace("ns").table("tbl").partitionKey(Key.ofInt("pk", 0)).build(); + doThrow(CommitConflictException.class).when(transaction).commit(); + + // Act Assert + assertThatThrownBy(() -> spied.get(get)).isInstanceOf(CrudConflictException.class); + + verify(spied).begin(); + verify(transaction).get(get); + verify(transaction).rollback(); + } + + @Test + public void + get_UnknownTransactionStatusExceptionThrownByTransactionCommit_ShouldThrowUnknownTransactionStatusException() + throws TransactionException { + // Arrange + TwoPhaseCommitTransaction transaction = mock(TwoPhaseCommitTransaction.class); + + TwoPhaseCommitTransactionManager spied = spy(manager); + doReturn(transaction).when(spied).begin(); + + Get get = + Get.newBuilder().namespace("ns").table("tbl").partitionKey(Key.ofInt("pk", 0)).build(); + doThrow(UnknownTransactionStatusException.class).when(transaction).commit(); + + // Act Assert + assertThatThrownBy(() -> spied.get(get)).isInstanceOf(UnknownTransactionStatusException.class); + + verify(spied).begin(); + verify(transaction).get(get); + verify(transaction).commit(); + } + + @Test + public void get_CommitExceptionThrownByTransactionCommit_ShouldThrowUCrudException() + throws TransactionException { + // Arrange + TwoPhaseCommitTransaction transaction = mock(TwoPhaseCommitTransaction.class); + + TwoPhaseCommitTransactionManager spied = spy(manager); + doReturn(transaction).when(spied).begin(); + + Get get = + Get.newBuilder().namespace("ns").table("tbl").partitionKey(Key.ofInt("pk", 0)).build(); + doThrow(CommitException.class).when(transaction).commit(); + + // Act Assert + assertThatThrownBy(() -> spied.get(get)).isInstanceOf(CrudException.class); + + verify(spied).begin(); + verify(transaction).get(get); + verify(transaction).commit(); + } + + @Test + public void scan_ShouldScan() throws TransactionException { + // Arrange + TwoPhaseCommitTransaction transaction = mock(TwoPhaseCommitTransaction.class); + + TwoPhaseCommitTransactionManager spied = spy(manager); + doReturn(transaction).when(spied).begin(); + + Scan scan = + Scan.newBuilder().namespace("ns").table("tbl").partitionKey(Key.ofInt("pk", 0)).build(); + + List results = + Arrays.asList(mock(Result.class), mock(Result.class), mock(Result.class)); + when(transaction.scan(scan)).thenReturn(results); + + // Act + List actual = spied.scan(scan); + + // Assert + verify(spied).begin(); + verify(transaction).scan(scan); + verify(transaction).prepare(); + verify(transaction).validate(); + verify(transaction).commit(); + assertThat(actual).isEqualTo(results); + } + + @Test + public void put_ShouldPut() throws TransactionException { + // Arrange + TwoPhaseCommitTransaction transaction = mock(TwoPhaseCommitTransaction.class); + + TwoPhaseCommitTransactionManager spied = spy(manager); + doReturn(transaction).when(spied).begin(); + + Put put = + Put.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofInt("pk", 0)) + .intValue("col", 0) + .build(); + + // Act + spied.put(put); + + // Assert + verify(spied).begin(); + verify(transaction).put(put); + verify(transaction).prepare(); + verify(transaction).validate(); + verify(transaction).commit(); + } + + @Test + public void put_MultiplePutsGiven_ShouldPut() throws TransactionException { + // Arrange + TwoPhaseCommitTransaction transaction = mock(TwoPhaseCommitTransaction.class); + + TwoPhaseCommitTransactionManager spied = spy(manager); + doReturn(transaction).when(spied).begin(); + + List puts = + Arrays.asList( + Put.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofInt("pk", 0)) + .intValue("col", 0) + .build(), + Put.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofInt("pk", 1)) + .intValue("col", 1) + .build(), + Put.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofInt("pk", 2)) + .intValue("col", 2) + .build()); + // Act + spied.put(puts); + + // Assert + verify(spied).begin(); + verify(transaction).put(puts); + verify(transaction).prepare(); + verify(transaction).validate(); + verify(transaction).commit(); + } + + @Test + public void insert_ShouldInsert() throws TransactionException { + // Arrange + TwoPhaseCommitTransaction transaction = mock(TwoPhaseCommitTransaction.class); + + TwoPhaseCommitTransactionManager spied = spy(manager); + doReturn(transaction).when(spied).begin(); + + Insert insert = + Insert.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofInt("pk", 0)) + .intValue("col", 0) + .build(); + + // Act + spied.insert(insert); + + // Assert + verify(spied).begin(); + verify(transaction).insert(insert); + verify(transaction).prepare(); + verify(transaction).validate(); + verify(transaction).commit(); + } + + @Test + public void upsert_ShouldUpsert() throws TransactionException { + // Arrange + TwoPhaseCommitTransaction transaction = mock(TwoPhaseCommitTransaction.class); + + TwoPhaseCommitTransactionManager spied = spy(manager); + doReturn(transaction).when(spied).begin(); + + Upsert upsert = + Upsert.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofInt("pk", 0)) + .intValue("col", 0) + .build(); + + // Act + spied.upsert(upsert); + + // Assert + verify(spied).begin(); + verify(transaction).upsert(upsert); + verify(transaction).prepare(); + verify(transaction).validate(); + verify(transaction).commit(); + } + + @Test + public void update_ShouldUpdate() throws TransactionException { + // Arrange + TwoPhaseCommitTransaction transaction = mock(TwoPhaseCommitTransaction.class); + + TwoPhaseCommitTransactionManager spied = spy(manager); + doReturn(transaction).when(spied).begin(); + + Update update = + Update.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofInt("pk", 0)) + .intValue("col", 0) + .build(); + + // Act + spied.update(update); + + // Assert + verify(spied).begin(); + verify(transaction).update(update); + verify(transaction).prepare(); + verify(transaction).validate(); + verify(transaction).commit(); + } + + @Test + public void delete_ShouldDelete() throws TransactionException { + // Arrange + TwoPhaseCommitTransaction transaction = mock(TwoPhaseCommitTransaction.class); + + TwoPhaseCommitTransactionManager spied = spy(manager); + doReturn(transaction).when(spied).begin(); + + Delete delete = + Delete.newBuilder().namespace("ns").table("tbl").partitionKey(Key.ofInt("pk", 0)).build(); + + // Act + spied.delete(delete); + + // Assert + verify(spied).begin(); + verify(transaction).delete(delete); + verify(transaction).prepare(); + verify(transaction).validate(); + verify(transaction).commit(); + } + + @Test + public void delete_MultipleDeletesGiven_ShouldDelete() throws TransactionException { + // Arrange + TwoPhaseCommitTransaction transaction = mock(TwoPhaseCommitTransaction.class); + + TwoPhaseCommitTransactionManager spied = spy(manager); + doReturn(transaction).when(spied).begin(); + + List deletes = + Arrays.asList( + Delete.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofInt("pk", 0)) + .build(), + Delete.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofInt("pk", 1)) + .build(), + Delete.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofInt("pk", 2)) + .build()); + // Act + spied.delete(deletes); + + // Assert + verify(spied).begin(); + verify(transaction).delete(deletes); + verify(transaction).prepare(); + verify(transaction).validate(); + verify(transaction).commit(); + } + + @Test + public void mutate_ShouldMutate() throws TransactionException { + // Arrange + TwoPhaseCommitTransaction transaction = mock(TwoPhaseCommitTransaction.class); + + TwoPhaseCommitTransactionManager spied = spy(manager); + doReturn(transaction).when(spied).begin(); + + List mutations = + Arrays.asList( + Put.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofInt("pk", 0)) + .intValue("col", 0) + .build(), + Insert.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofInt("pk", 1)) + .intValue("col", 1) + .build(), + Upsert.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofInt("pk", 2)) + .intValue("col", 2) + .build(), + Update.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofInt("pk", 3)) + .intValue("col", 3) + .build(), + Delete.newBuilder() + .namespace("ns") + .table("tbl") + .partitionKey(Key.ofInt("pk", 4)) + .build()); + + // Act + spied.mutate(mutations); + + // Assert + verify(spied).begin(); + verify(transaction).mutate(mutations); + verify(transaction).prepare(); + verify(transaction).validate(); + verify(transaction).commit(); + } } diff --git a/integration-test/src/main/java/com/scalar/db/api/DistributedTransactionIntegrationTestBase.java b/integration-test/src/main/java/com/scalar/db/api/DistributedTransactionIntegrationTestBase.java index 4c06f55360..bdf0a01bd7 100644 --- a/integration-test/src/main/java/com/scalar/db/api/DistributedTransactionIntegrationTestBase.java +++ b/integration-test/src/main/java/com/scalar/db/api/DistributedTransactionIntegrationTestBase.java @@ -1025,9 +1025,8 @@ public void resume_WithBeginningAndCommittingTransaction_ShouldThrowTransactionN public void operation_DefaultNamespaceGiven_ShouldWorkProperly() throws TransactionException { Properties properties = getProperties(getTestName()); properties.put(DatabaseConfig.DEFAULT_NAMESPACE_NAME, namespace); - final DistributedTransactionManager managerWithDefaultNamespace = - TransactionFactory.create(properties).getTransactionManager(); - try { + try (DistributedTransactionManager managerWithDefaultNamespace = + TransactionFactory.create(properties).getTransactionManager()) { // Arrange populateRecords(); Get get = @@ -1043,6 +1042,28 @@ public void operation_DefaultNamespaceGiven_ShouldWorkProperly() throws Transact .partitionKey(Key.ofInt(ACCOUNT_ID, 1)) .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) .intValue(BALANCE, 300) + .enableImplicitPreRead() + .build(); + Insert insert = + Insert.newBuilder() + .table(TABLE) + .partitionKey(Key.ofInt(ACCOUNT_ID, 4)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, 300) + .build(); + Upsert upsert = + Upsert.newBuilder() + .table(TABLE) + .partitionKey(Key.ofInt(ACCOUNT_ID, 5)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, 300) + .build(); + Update update = + Update.newBuilder() + .table(TABLE) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, 300) .build(); Delete delete = Delete.newBuilder() @@ -1056,6 +1077,7 @@ public void operation_DefaultNamespaceGiven_ShouldWorkProperly() throws Transact .partitionKey(Key.ofInt(ACCOUNT_ID, 3)) .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) .intValue(BALANCE, 300) + .enableImplicitPreRead() .build(); Mutation deleteAsMutation2 = Delete.newBuilder() @@ -1071,15 +1093,14 @@ public void operation_DefaultNamespaceGiven_ShouldWorkProperly() throws Transact tx.get(get); tx.scan(scan); tx.put(put); + tx.insert(insert); + tx.upsert(upsert); + tx.update(update); tx.delete(delete); tx.mutate(ImmutableList.of(putAsMutation1, deleteAsMutation2)); tx.commit(); }) .doesNotThrowAnyException(); - } finally { - if (managerWithDefaultNamespace != null) { - managerWithDefaultNamespace.close(); - } } } @@ -1666,6 +1687,348 @@ public void update_withUpdateIfWithNonVerifiedCondition_shouldThrowUnsatisfiedCo assertThat(result.isNull(SOME_COLUMN)).isTrue(); } + @Test + public void manager_get_GetGivenForCommittedRecord_ShouldReturnRecord() + throws TransactionException { + // Arrange + populateRecords(); + Get get = prepareGet(0, 0); + + // Act + Optional result = manager.get(get); + + // Assert + assertThat(result.isPresent()).isTrue(); + assertThat(result.get().getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(result.get().getInt(ACCOUNT_TYPE)).isEqualTo(0); + assertThat(getBalance(result.get())).isEqualTo(INITIAL_BALANCE); + assertThat(result.get().getInt(SOME_COLUMN)).isEqualTo(0); + } + + @Test + public void manager_scan_ScanGivenForCommittedRecord_ShouldReturnRecords() + throws TransactionException { + // Arrange + populateRecords(); + Scan scan = prepareScan(1, 0, 2); + + // Act + List results = manager.scan(scan); + + // Assert + assertThat(results.size()).isEqualTo(3); + assertThat(results.get(0).getInt(ACCOUNT_ID)).isEqualTo(1); + assertThat(results.get(0).getInt(ACCOUNT_TYPE)).isEqualTo(0); + assertThat(getBalance(results.get(0))).isEqualTo(INITIAL_BALANCE); + assertThat(results.get(0).getInt(SOME_COLUMN)).isEqualTo(0); + + assertThat(results.get(1).getInt(ACCOUNT_ID)).isEqualTo(1); + assertThat(results.get(1).getInt(ACCOUNT_TYPE)).isEqualTo(1); + assertThat(getBalance(results.get(1))).isEqualTo(INITIAL_BALANCE); + assertThat(results.get(1).getInt(SOME_COLUMN)).isEqualTo(1); + + assertThat(results.get(2).getInt(ACCOUNT_ID)).isEqualTo(1); + assertThat(results.get(2).getInt(ACCOUNT_TYPE)).isEqualTo(2); + assertThat(getBalance(results.get(2))).isEqualTo(INITIAL_BALANCE); + assertThat(results.get(2).getInt(SOME_COLUMN)).isEqualTo(2); + } + + @Test + public void manager_put_PutGivenForNonExisting_ShouldCreateRecord() throws TransactionException { + // Arrange + int expected = INITIAL_BALANCE; + Put put = + Put.newBuilder() + .namespace(namespace) + .table(TABLE) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, expected) + .build(); + + // Act + manager.put(put); + + // Assert + Get get = prepareGet(0, 0); + Optional result = manager.get(get); + + assertThat(result.isPresent()).isTrue(); + assertThat(getBalance(result.get())).isEqualTo(expected); + } + + @Test + public void manager_put_PutGivenForExisting_ShouldUpdateRecord() throws TransactionException { + // Arrange + populateRecords(); + + // Act + int expected = INITIAL_BALANCE + 100; + Put put = + Put.newBuilder() + .namespace(namespace) + .table(TABLE) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, expected) + .enableImplicitPreRead() + .build(); + manager.put(put); + + // Assert + Optional actual = manager.get(prepareGet(0, 0)); + + assertThat(actual.isPresent()).isTrue(); + assertThat(getBalance(actual.get())).isEqualTo(expected); + } + + @Test + public void manager_insert_InsertGivenForNonExisting_ShouldCreateRecord() + throws TransactionException { + // Arrange + int expected = INITIAL_BALANCE; + Insert insert = + Insert.newBuilder() + .namespace(namespace) + .table(TABLE) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, expected) + .build(); + + // Act + manager.insert(insert); + + // Assert + Get get = prepareGet(0, 0); + Optional result = manager.get(get); + + assertThat(result.isPresent()).isTrue(); + assertThat(getBalance(result.get())).isEqualTo(expected); + } + + @Test + public void manager_insert_InsertGivenForExisting_ShouldThrowCrudConflictException() + throws TransactionException { + // Arrange + populateRecords(); + + // Act Assert + int expected = INITIAL_BALANCE + 100; + Insert insert = + Insert.newBuilder() + .namespace(namespace) + .table(TABLE) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, expected) + .build(); + + assertThatThrownBy(() -> manager.insert(insert)).isInstanceOf(CrudConflictException.class); + } + + @Test + public void manager_upsert_UpsertGivenForNonExisting_ShouldCreateRecord() + throws TransactionException { + // Arrange + int expected = INITIAL_BALANCE; + Upsert upsert = + Upsert.newBuilder() + .namespace(namespace) + .table(TABLE) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, expected) + .build(); + + // Act + manager.upsert(upsert); + + // Assert + Get get = prepareGet(0, 0); + Optional result = manager.get(get); + + assertThat(result.isPresent()).isTrue(); + assertThat(getBalance(result.get())).isEqualTo(expected); + } + + @Test + public void manager_upsert_UpsertGivenForExisting_ShouldUpdateRecord() + throws TransactionException { + // Arrange + populateRecords(); + + // Act + int expected = INITIAL_BALANCE + 100; + Upsert upsert = + Upsert.newBuilder() + .namespace(namespace) + .table(TABLE) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, expected) + .build(); + manager.upsert(upsert); + + // Assert + Optional actual = manager.get(prepareGet(0, 0)); + + assertThat(actual.isPresent()).isTrue(); + assertThat(getBalance(actual.get())).isEqualTo(expected); + } + + @Test + public void manager_update_UpdateGivenForNonExisting_ShouldDoNothing() + throws TransactionException { + // Arrange + Update update = + Update.newBuilder() + .namespace(namespace) + .table(TABLE) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, INITIAL_BALANCE) + .build(); + + // Act + assertThatCode(() -> manager.update(update)).doesNotThrowAnyException(); + + // Assert + Optional actual = manager.get(prepareGet(0, 0)); + + assertThat(actual).isEmpty(); + } + + @Test + public void manager_update_UpdateGivenForExisting_ShouldUpdateRecord() + throws TransactionException { + // Arrange + populateRecords(); + + // Act + int expected = INITIAL_BALANCE + 100; + Update update = + Update.newBuilder() + .namespace(namespace) + .table(TABLE) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, expected) + .build(); + manager.update(update); + + // Assert + Optional actual = manager.get(prepareGet(0, 0)); + + assertThat(actual.isPresent()).isTrue(); + assertThat(getBalance(actual.get())).isEqualTo(expected); + } + + @Test + public void manager_delete_DeleteGivenForExisting_ShouldDeleteRecord() + throws TransactionException { + // Arrange + populateRecords(); + Delete delete = prepareDelete(0, 0); + + // Act + manager.delete(delete); + + // Assert + Optional result = manager.get(prepareGet(0, 0)); + + assertThat(result.isPresent()).isFalse(); + } + + @Test + public void manager_operation_DefaultNamespaceGiven_ShouldWorkProperly() + throws TransactionException { + Properties properties = getProperties(getTestName()); + properties.put(DatabaseConfig.DEFAULT_NAMESPACE_NAME, namespace); + try (DistributedTransactionManager managerWithDefaultNamespace = + TransactionFactory.create(properties).getTransactionManager()) { + // Arrange + populateRecords(); + Get get = + Get.newBuilder() + .table(TABLE) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .build(); + Scan scan = Scan.newBuilder().table(TABLE).all().build(); + Put put = + Put.newBuilder() + .table(TABLE) + .partitionKey(Key.ofInt(ACCOUNT_ID, 1)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, 300) + .enableImplicitPreRead() + .build(); + Delete delete = + Delete.newBuilder() + .table(TABLE) + .partitionKey(Key.ofInt(ACCOUNT_ID, 2)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .build(); + Insert insert = + Insert.newBuilder() + .table(TABLE) + .partitionKey(Key.ofInt(ACCOUNT_ID, 4)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, 300) + .build(); + Upsert upsert = + Upsert.newBuilder() + .table(TABLE) + .partitionKey(Key.ofInt(ACCOUNT_ID, 5)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, 300) + .build(); + Update update = + Update.newBuilder() + .table(TABLE) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, 300) + .build(); + Mutation putAsMutation1 = + Put.newBuilder() + .table(TABLE) + .partitionKey(Key.ofInt(ACCOUNT_ID, 3)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, 300) + .enableImplicitPreRead() + .build(); + Mutation deleteAsMutation2 = + Delete.newBuilder() + .table(TABLE) + .partitionKey(Key.ofInt(ACCOUNT_ID, 3)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 1)) + .build(); + + // Act Assert + Assertions.assertThatCode(() -> managerWithDefaultNamespace.get(get)) + .doesNotThrowAnyException(); + Assertions.assertThatCode(() -> managerWithDefaultNamespace.scan(scan)) + .doesNotThrowAnyException(); + Assertions.assertThatCode(() -> managerWithDefaultNamespace.put(put)) + .doesNotThrowAnyException(); + Assertions.assertThatCode(() -> managerWithDefaultNamespace.insert(insert)) + .doesNotThrowAnyException(); + Assertions.assertThatCode(() -> managerWithDefaultNamespace.upsert(upsert)) + .doesNotThrowAnyException(); + Assertions.assertThatCode(() -> managerWithDefaultNamespace.update(update)) + .doesNotThrowAnyException(); + Assertions.assertThatCode(() -> managerWithDefaultNamespace.delete(delete)) + .doesNotThrowAnyException(); + Assertions.assertThatCode( + () -> + managerWithDefaultNamespace.mutate( + ImmutableList.of(putAsMutation1, deleteAsMutation2))) + .doesNotThrowAnyException(); + } + } + protected Optional get(Get get) throws TransactionException { DistributedTransaction tx = manager.start(); try { diff --git a/integration-test/src/main/java/com/scalar/db/api/TwoPhaseCommitTransactionIntegrationTestBase.java b/integration-test/src/main/java/com/scalar/db/api/TwoPhaseCommitTransactionIntegrationTestBase.java index c720f36477..4494224338 100644 --- a/integration-test/src/main/java/com/scalar/db/api/TwoPhaseCommitTransactionIntegrationTestBase.java +++ b/integration-test/src/main/java/com/scalar/db/api/TwoPhaseCommitTransactionIntegrationTestBase.java @@ -1271,9 +1271,8 @@ public void resume_WithBeginningAndCommittingTransaction_ShouldThrowTransactionN public void operation_DefaultNamespaceGiven_ShouldWorkProperly() throws TransactionException { Properties properties = getProperties1(getTestName()); properties.put(DatabaseConfig.DEFAULT_NAMESPACE_NAME, namespace1); - final TwoPhaseCommitTransactionManager manager1WithDefaultNamespace = - TransactionFactory.create(properties).getTwoPhaseCommitTransactionManager(); - try { + try (TwoPhaseCommitTransactionManager manager1WithDefaultNamespace = + TransactionFactory.create(properties).getTwoPhaseCommitTransactionManager()) { // Arrange populateRecords(manager1WithDefaultNamespace, namespace1, TABLE_1); Get get = @@ -1289,6 +1288,28 @@ public void operation_DefaultNamespaceGiven_ShouldWorkProperly() throws Transact .partitionKey(Key.ofInt(ACCOUNT_ID, 1)) .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) .intValue(BALANCE, 300) + .enableImplicitPreRead() + .build(); + Insert insert = + Insert.newBuilder() + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 4)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, 300) + .build(); + Upsert upsert = + Upsert.newBuilder() + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 5)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, 300) + .build(); + Update update = + Update.newBuilder() + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, 300) .build(); Delete delete = Delete.newBuilder() @@ -1302,6 +1323,7 @@ public void operation_DefaultNamespaceGiven_ShouldWorkProperly() throws Transact .partitionKey(Key.ofInt(ACCOUNT_ID, 3)) .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) .intValue(BALANCE, 300) + .enableImplicitPreRead() .build(); Mutation deleteAsMutation2 = Delete.newBuilder() @@ -1317,6 +1339,9 @@ public void operation_DefaultNamespaceGiven_ShouldWorkProperly() throws Transact tx.get(get); tx.scan(scan); tx.put(put); + tx.insert(insert); + tx.upsert(upsert); + tx.update(update); tx.delete(delete); tx.mutate(ImmutableList.of(putAsMutation1, deleteAsMutation2)); tx.prepare(); @@ -1324,10 +1349,6 @@ public void operation_DefaultNamespaceGiven_ShouldWorkProperly() throws Transact tx.commit(); }) .doesNotThrowAnyException(); - } finally { - if (manager1WithDefaultNamespace != null) { - manager1WithDefaultNamespace.close(); - } } } @@ -1957,6 +1978,348 @@ public void update_withUpdateIfWithNonVerifiedCondition_shouldThrowUnsatisfiedCo assertThat(result.isNull(SOME_COLUMN)).isTrue(); } + @Test + public void manager_get_GetGivenForCommittedRecord_ShouldReturnRecord() + throws TransactionException { + // Arrange + populateRecords(manager1, namespace1, TABLE_1); + Get get = prepareGet(0, 0, namespace1, TABLE_1); + + // Act + Optional result = manager1.get(get); + + // Assert + assertThat(result.isPresent()).isTrue(); + assertThat(result.get().getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(result.get().getInt(ACCOUNT_TYPE)).isEqualTo(0); + assertThat(getBalance(result.get())).isEqualTo(INITIAL_BALANCE); + assertThat(result.get().getInt(SOME_COLUMN)).isEqualTo(0); + } + + @Test + public void manager_scan_ScanGivenForCommittedRecord_ShouldReturnRecords() + throws TransactionException { + // Arrange + populateRecords(manager1, namespace1, TABLE_1); + Scan scan = prepareScan(1, 0, 2, namespace1, TABLE_1); + + // Act + List results = manager1.scan(scan); + + // Assert + assertThat(results.size()).isEqualTo(3); + assertThat(results.get(0).getInt(ACCOUNT_ID)).isEqualTo(1); + assertThat(results.get(0).getInt(ACCOUNT_TYPE)).isEqualTo(0); + assertThat(getBalance(results.get(0))).isEqualTo(INITIAL_BALANCE); + assertThat(results.get(0).getInt(SOME_COLUMN)).isEqualTo(0); + + assertThat(results.get(1).getInt(ACCOUNT_ID)).isEqualTo(1); + assertThat(results.get(1).getInt(ACCOUNT_TYPE)).isEqualTo(1); + assertThat(getBalance(results.get(1))).isEqualTo(INITIAL_BALANCE); + assertThat(results.get(1).getInt(SOME_COLUMN)).isEqualTo(1); + + assertThat(results.get(2).getInt(ACCOUNT_ID)).isEqualTo(1); + assertThat(results.get(2).getInt(ACCOUNT_TYPE)).isEqualTo(2); + assertThat(getBalance(results.get(2))).isEqualTo(INITIAL_BALANCE); + assertThat(results.get(2).getInt(SOME_COLUMN)).isEqualTo(2); + } + + @Test + public void manager_put_PutGivenForNonExisting_ShouldCreateRecord() throws TransactionException { + // Arrange + int expected = INITIAL_BALANCE; + Put put = + Put.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, expected) + .build(); + + // Act + manager1.put(put); + + // Assert + Get get = prepareGet(0, 0, namespace1, TABLE_1); + Optional result = manager1.get(get); + + assertThat(result.isPresent()).isTrue(); + assertThat(getBalance(result.get())).isEqualTo(expected); + } + + @Test + public void manager_put_PutGivenForExisting_ShouldUpdateRecord() throws TransactionException { + // Arrange + populateRecords(manager1, namespace1, TABLE_1); + + // Act + int expected = INITIAL_BALANCE + 100; + Put put = + Put.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, expected) + .enableImplicitPreRead() + .build(); + manager1.put(put); + + // Assert + Optional actual = manager1.get(prepareGet(0, 0, namespace1, TABLE_1)); + + assertThat(actual.isPresent()).isTrue(); + assertThat(getBalance(actual.get())).isEqualTo(expected); + } + + @Test + public void manager_insert_InsertGivenForNonExisting_ShouldCreateRecord() + throws TransactionException { + // Arrange + int expected = INITIAL_BALANCE; + Insert insert = + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, expected) + .build(); + + // Act + manager1.insert(insert); + + // Assert + Get get = prepareGet(0, 0, namespace1, TABLE_1); + Optional result = manager1.get(get); + + assertThat(result.isPresent()).isTrue(); + assertThat(getBalance(result.get())).isEqualTo(expected); + } + + @Test + public void manager_insert_InsertGivenForExisting_ShouldThrowCrudConflictException() + throws TransactionException { + // Arrange + populateRecords(manager1, namespace1, TABLE_1); + + int expected = INITIAL_BALANCE + 100; + Insert insert = + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, expected) + .build(); + + // Act Assert + assertThatThrownBy(() -> manager1.insert(insert)).isInstanceOf(CrudConflictException.class); + } + + @Test + public void manager_upsert_UpsertGivenForNonExisting_ShouldCreateRecord() + throws TransactionException { + // Arrange + int expected = INITIAL_BALANCE; + Upsert upsert = + Upsert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, expected) + .build(); + + // Act + manager1.upsert(upsert); + + // Assert + Get get = prepareGet(0, 0, namespace1, TABLE_1); + Optional result = manager1.get(get); + + assertThat(result.isPresent()).isTrue(); + assertThat(getBalance(result.get())).isEqualTo(expected); + } + + @Test + public void manager_upsert_UpsertGivenForExisting_ShouldUpdateRecord() + throws TransactionException { + // Arrange + populateRecords(manager1, namespace1, TABLE_1); + + // Act + int expected = INITIAL_BALANCE + 100; + Upsert upsert = + Upsert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, expected) + .build(); + manager1.upsert(upsert); + + // Assert + Optional actual = manager1.get(prepareGet(0, 0, namespace1, TABLE_1)); + + assertThat(actual.isPresent()).isTrue(); + assertThat(getBalance(actual.get())).isEqualTo(expected); + } + + @Test + public void manager_update_UpdateGivenForNonExisting_ShouldDoNothing() + throws TransactionException { + // Arrange + Update update = + Update.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, INITIAL_BALANCE) + .build(); + + // Act + assertThatCode(() -> manager1.update(update)).doesNotThrowAnyException(); + + // Assert + Optional actual = manager1.get(prepareGet(0, 0, namespace1, TABLE_1)); + + assertThat(actual).isEmpty(); + } + + @Test + public void manager_update_UpdateGivenForExisting_ShouldUpdateRecord() + throws TransactionException { + // Arrange + populateRecords(manager1, namespace1, TABLE_1); + + // Act + int expected = INITIAL_BALANCE + 100; + Update update = + Update.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, expected) + .build(); + manager1.update(update); + + // Assert + Optional actual = manager1.get(prepareGet(0, 0, namespace1, TABLE_1)); + + assertThat(actual.isPresent()).isTrue(); + assertThat(getBalance(actual.get())).isEqualTo(expected); + } + + @Test + public void manager_delete_DeleteGivenForExisting_ShouldDeleteRecord() + throws TransactionException { + // Arrange + populateRecords(manager1, namespace1, TABLE_1); + Delete delete = prepareDelete(0, 0, namespace1, TABLE_1); + + // Act + manager1.delete(delete); + + // Assert + Optional result = manager1.get(prepareGet(0, 0, namespace1, TABLE_1)); + + assertThat(result.isPresent()).isFalse(); + } + + @Test + public void manager_operation_DefaultNamespaceGiven_ShouldWorkProperly() + throws TransactionException { + Properties properties = getProperties1(getTestName()); + properties.put(DatabaseConfig.DEFAULT_NAMESPACE_NAME, namespace1); + try (TwoPhaseCommitTransactionManager manager1WithDefaultNamespace = + TransactionFactory.create(properties).getTwoPhaseCommitTransactionManager()) { + // Arrange + populateRecords(manager1WithDefaultNamespace, namespace1, TABLE_1); + Get get = + Get.newBuilder() + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .build(); + Scan scan = Scan.newBuilder().table(TABLE_1).all().build(); + Put put = + Put.newBuilder() + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 1)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, 300) + .enableImplicitPreRead() + .build(); + Insert insert = + Insert.newBuilder() + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 4)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, 300) + .build(); + Upsert upsert = + Upsert.newBuilder() + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 5)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, 300) + .build(); + Update update = + Update.newBuilder() + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, 300) + .build(); + Delete delete = + Delete.newBuilder() + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 2)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .build(); + Mutation putAsMutation1 = + Put.newBuilder() + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 3)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, 300) + .enableImplicitPreRead() + .build(); + Mutation deleteAsMutation2 = + Delete.newBuilder() + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 3)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 1)) + .build(); + + // Act Assert + Assertions.assertThatCode(() -> manager1WithDefaultNamespace.get(get)) + .doesNotThrowAnyException(); + Assertions.assertThatCode(() -> manager1WithDefaultNamespace.scan(scan)) + .doesNotThrowAnyException(); + Assertions.assertThatCode(() -> manager1WithDefaultNamespace.put(put)) + .doesNotThrowAnyException(); + Assertions.assertThatCode(() -> manager1WithDefaultNamespace.insert(insert)) + .doesNotThrowAnyException(); + Assertions.assertThatCode(() -> manager1WithDefaultNamespace.upsert(upsert)) + .doesNotThrowAnyException(); + Assertions.assertThatCode(() -> manager1WithDefaultNamespace.update(update)) + .doesNotThrowAnyException(); + Assertions.assertThatCode(() -> manager1WithDefaultNamespace.delete(delete)) + .doesNotThrowAnyException(); + Assertions.assertThatCode( + () -> + manager1WithDefaultNamespace.mutate( + ImmutableList.of(putAsMutation1, deleteAsMutation2))) + .doesNotThrowAnyException(); + } + } + private Optional get(Get get) throws TransactionException { TwoPhaseCommitTransaction tx = manager1.start(); try { @@ -1998,9 +2361,9 @@ private void delete(Delete delete) throws TransactionException { } protected void populateRecords( - TwoPhaseCommitTransactionManager manager, String namespaceName, String tableName) + TwoPhaseCommitTransactionManager manager1, String namespaceName, String tableName) throws TransactionException { - TwoPhaseCommitTransaction transaction = manager.begin(); + TwoPhaseCommitTransaction transaction = manager1.begin(); IntStream.range(0, NUM_ACCOUNTS) .forEach( i -> diff --git a/server/src/integration-test/java/com/scalar/db/server/ConsensusCommitIntegrationTestWithServer.java b/server/src/integration-test/java/com/scalar/db/server/ConsensusCommitIntegrationTestWithServer.java index 5bcc946adf..a0b8b4442c 100644 --- a/server/src/integration-test/java/com/scalar/db/server/ConsensusCommitIntegrationTestWithServer.java +++ b/server/src/integration-test/java/com/scalar/db/server/ConsensusCommitIntegrationTestWithServer.java @@ -43,6 +43,11 @@ public void afterAll() throws Exception { } } + @Disabled("ScalarDB Server doesn't support insert(), upsert(), and update()") + @Override + @Test + public void operation_DefaultNamespaceGiven_ShouldWorkProperly() {} + @Disabled("ScalarDB Server doesn't support insert(), upsert(), and update()") @Override @Test @@ -101,4 +106,39 @@ public void update_withUpdateIfWithVerifiedCondition_shouldUpdateProperly() {} @Test public void update_withUpdateIfWithNonVerifiedCondition_shouldThrowUnsatisfiedConditionException() {} + + @Disabled("ScalarDB Server doesn't support insert(), upsert(), and update()") + @Override + @Test + public void manager_insert_InsertGivenForNonExisting_ShouldCreateRecord() {} + + @Disabled("ScalarDB Server doesn't support insert(), upsert(), and update()") + @Override + @Test + public void manager_insert_InsertGivenForExisting_ShouldThrowCrudConflictException() {} + + @Disabled("ScalarDB Server doesn't support insert(), upsert(), and update()") + @Override + @Test + public void manager_upsert_UpsertGivenForNonExisting_ShouldCreateRecord() {} + + @Disabled("ScalarDB Server doesn't support insert(), upsert(), and update()") + @Override + @Test + public void manager_upsert_UpsertGivenForExisting_ShouldUpdateRecord() {} + + @Disabled("ScalarDB Server doesn't support insert(), upsert(), and update()") + @Override + @Test + public void manager_update_UpdateGivenForExisting_ShouldUpdateRecord() {} + + @Disabled("ScalarDB Server doesn't support insert(), upsert(), and update()") + @Override + @Test + public void manager_update_UpdateGivenForNonExisting_ShouldDoNothing() {} + + @Disabled("ScalarDB Server doesn't support insert(), upsert(), and update()") + @Override + @Test + public void manager_operation_DefaultNamespaceGiven_ShouldWorkProperly() {} } diff --git a/server/src/integration-test/java/com/scalar/db/server/TwoPhaseConsensusCommitIntegrationTestWithServer.java b/server/src/integration-test/java/com/scalar/db/server/TwoPhaseConsensusCommitIntegrationTestWithServer.java index a787559c7c..bafa126e42 100644 --- a/server/src/integration-test/java/com/scalar/db/server/TwoPhaseConsensusCommitIntegrationTestWithServer.java +++ b/server/src/integration-test/java/com/scalar/db/server/TwoPhaseConsensusCommitIntegrationTestWithServer.java @@ -60,6 +60,11 @@ public void afterAll() throws Exception { } } + @Disabled("ScalarDB Server doesn't support insert(), upsert(), and update()") + @Override + @Test + public void operation_DefaultNamespaceGiven_ShouldWorkProperly() {} + @Disabled("ScalarDB Server doesn't support insert(), upsert(), and update()") @Override @Test @@ -118,4 +123,39 @@ public void update_withUpdateIfWithVerifiedCondition_shouldUpdateProperly() {} @Test public void update_withUpdateIfWithNonVerifiedCondition_shouldThrowUnsatisfiedConditionException() {} + + @Disabled("ScalarDB Server doesn't support insert(), upsert(), and update()") + @Override + @Test + public void manager_insert_InsertGivenForNonExisting_ShouldCreateRecord() {} + + @Disabled("ScalarDB Server doesn't support insert(), upsert(), and update()") + @Override + @Test + public void manager_insert_InsertGivenForExisting_ShouldThrowCrudConflictException() {} + + @Disabled("ScalarDB Server doesn't support insert(), upsert(), and update()") + @Override + @Test + public void manager_upsert_UpsertGivenForNonExisting_ShouldCreateRecord() {} + + @Disabled("ScalarDB Server doesn't support insert(), upsert(), and update()") + @Override + @Test + public void manager_upsert_UpsertGivenForExisting_ShouldUpdateRecord() {} + + @Disabled("ScalarDB Server doesn't support insert(), upsert(), and update()") + @Override + @Test + public void manager_update_UpdateGivenForExisting_ShouldUpdateRecord() {} + + @Disabled("ScalarDB Server doesn't support insert(), upsert(), and update()") + @Override + @Test + public void manager_update_UpdateGivenForNonExisting_ShouldDoNothing() {} + + @Disabled("ScalarDB Server doesn't support insert(), upsert(), and update()") + @Override + @Test + public void manager_operation_DefaultNamespaceGiven_ShouldWorkProperly() {} } diff --git a/server/src/main/java/com/scalar/db/server/DistributedTransactionService.java b/server/src/main/java/com/scalar/db/server/DistributedTransactionService.java index f998287bdd..988b08a64a 100644 --- a/server/src/main/java/com/scalar/db/server/DistributedTransactionService.java +++ b/server/src/main/java/com/scalar/db/server/DistributedTransactionService.java @@ -36,6 +36,7 @@ import com.scalar.db.rpc.TransactionResponse.StartResponse; import com.scalar.db.util.ProtoUtils; import com.scalar.db.util.ThrowableRunnable; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.grpc.Status; import io.grpc.stub.StreamObserver; import java.util.Collections; @@ -59,6 +60,7 @@ public class DistributedTransactionService private final GateKeeper gateKeeper; private final Metrics metrics; + @SuppressFBWarnings("EI_EXPOSE_REP2") public DistributedTransactionService( DistributedTransactionManager manager, TableMetadataManager tableMetadataManager, diff --git a/server/src/main/java/com/scalar/db/server/TwoPhaseCommitTransactionService.java b/server/src/main/java/com/scalar/db/server/TwoPhaseCommitTransactionService.java index 8b295a7649..fcbad59363 100644 --- a/server/src/main/java/com/scalar/db/server/TwoPhaseCommitTransactionService.java +++ b/server/src/main/java/com/scalar/db/server/TwoPhaseCommitTransactionService.java @@ -41,6 +41,7 @@ import com.scalar.db.rpc.TwoPhaseCommitTransactionResponse.StartResponse; import com.scalar.db.util.ProtoUtils; import com.scalar.db.util.ThrowableRunnable; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.grpc.Status; import io.grpc.stub.StreamObserver; import java.util.Collections; @@ -65,6 +66,7 @@ public class TwoPhaseCommitTransactionService private final GateKeeper gateKeeper; private final Metrics metrics; + @SuppressFBWarnings("EI_EXPOSE_REP2") public TwoPhaseCommitTransactionService( TwoPhaseCommitTransactionManager manager, TableMetadataManager tableMetadataManager,