From 2d137a79a29a99f9e6d103aa79ebe6f5b38df359 Mon Sep 17 00:00:00 2001 From: Tobias Hafner Date: Wed, 4 Dec 2024 12:06:25 +0100 Subject: [PATCH] Adjust isolation test and bugfix --- ...currencyTests.java => IsolationTests.java} | 134 ++++++++++-------- .../org/polypheny/db/transaction/Session.java | 28 +++- 2 files changed, 97 insertions(+), 65 deletions(-) rename dbms/src/test/java/org/polypheny/db/transaction/{ConcurrencyTests.java => IsolationTests.java} (82%) diff --git a/dbms/src/test/java/org/polypheny/db/transaction/ConcurrencyTests.java b/dbms/src/test/java/org/polypheny/db/transaction/IsolationTests.java similarity index 82% rename from dbms/src/test/java/org/polypheny/db/transaction/ConcurrencyTests.java rename to dbms/src/test/java/org/polypheny/db/transaction/IsolationTests.java index d7a610ed4e..63ca5fae26 100644 --- a/dbms/src/test/java/org/polypheny/db/transaction/ConcurrencyTests.java +++ b/dbms/src/test/java/org/polypheny/db/transaction/IsolationTests.java @@ -30,7 +30,7 @@ import org.polypheny.db.TestHelper; import org.polypheny.db.processing.ImplementationContext.ExecutedContext; -public class ConcurrencyTests { +public class IsolationTests { private static TestHelper testHelper; @@ -105,18 +105,18 @@ public void dirtyReadSimple() throws ExecutionException, InterruptedException, T Session session2 = new Session( testHelper ); session1.startTransaction(); - session1.executeStatementIgnoreResult( + session1.executeStatementIgnoreResultAsync( "UPDATE accounts SET balance = 200 WHERE id = 1;", "sql" ); session2.startTransaction(); - Future> futureResult = session2.executeStatement( + Future> futureResult = session2.executeStatementAsync( "SELECT balance FROM accounts WHERE id = 1;", "sql" ); - session1.rollbackTransaction(); + session1.rollbackTransactionAsync(); List results = futureResult.get( 1, TimeUnit.MINUTES ); assertEquals( 1, results.size() ); @@ -124,7 +124,7 @@ public void dirtyReadSimple() throws ExecutionException, InterruptedException, T assertEquals( 100, balance ); closeAndIgnore( results ); - session2.commitTransaction(); + session2.commitTransactionAsync(); session1.awaitCompletion(); session2.awaitCompletion(); @@ -141,7 +141,7 @@ public void fuzzyReadSimple() throws ExecutionException, InterruptedException, T Session session2 = new Session( testHelper ); session1.startTransaction(); - Future> futureFirstRead = session1.executeStatement( + Future> futureFirstRead = session1.executeStatementAsync( "SELECT balance FROM accounts WHERE id = 1;", "sql" ); @@ -153,13 +153,13 @@ public void fuzzyReadSimple() throws ExecutionException, InterruptedException, T closeAndIgnore( firstRead ); session2.startTransaction(); - session2.executeStatementIgnoreResult( + session2.executeStatementIgnoreResultAsync( "UPDATE accounts SET balance = 300 WHERE id = 1;", "sql" ); - session2.commitTransaction(); + session2.commitTransactionAsync(); - Future> futureSecondRead = session1.executeStatement( + Future> futureSecondRead = session1.executeStatementAsync( "SELECT balance FROM accounts WHERE id = 1;", "sql" ); @@ -170,7 +170,7 @@ public void fuzzyReadSimple() throws ExecutionException, InterruptedException, T assertEquals( 100, secondBalance ); closeAndIgnore( secondRead ); - session1.commitTransaction(); + session1.commitTransactionAsync(); session1.awaitCompletion(); session2.awaitCompletion(); @@ -187,7 +187,7 @@ public void phantomSimple() throws ExecutionException, InterruptedException, Tim Session session2 = new Session( testHelper ); session1.startTransaction(); - Future> futureFirstRead = session1.executeStatement( + Future> futureFirstRead = session1.executeStatementAsync( "SELECT balance FROM accounts WHERE balance > 150;", "sql" ); @@ -198,13 +198,13 @@ public void phantomSimple() throws ExecutionException, InterruptedException, Tim closeAndIgnore( firstRead ); session2.startTransaction(); - session2.executeStatementIgnoreResult( + session2.executeStatementIgnoreResultAsync( "INSERT INTO accounts (id, balance) VALUES (3, 200);", "sql" ); - session2.commitTransaction(); + session2.commitTransactionAsync(); - Future> futureSecondRead = session1.executeStatement( + Future> futureSecondRead = session1.executeStatementAsync( "SELECT balance FROM accounts WHERE balance > 150;", "sql" ); @@ -214,7 +214,7 @@ public void phantomSimple() throws ExecutionException, InterruptedException, Tim assertFalse( secondRead.get( 0 ).getIterator().hasMoreRows() ); closeAndIgnore( secondRead ); - session1.commitTransaction(); + session1.commitTransactionAsync(); session1.awaitCompletion(); session2.awaitCompletion(); @@ -231,27 +231,27 @@ public void dirtyWriteSimple() throws InterruptedException, ExecutionException, Session session2 = new Session( testHelper ); session1.startTransaction(); - session1.executeStatementIgnoreResult( + Future> futureFirst = session1.executeStatementAsync( "UPDATE accounts SET balance = 250 WHERE id = 1;", "sql" ); - - session1.commitTransaction(); + closeAndIgnore( futureFirst ); session2.startTransaction(); - session2.executeStatementIgnoreResult( + session2.executeStatementIgnoreResultAsync( "UPDATE accounts SET balance = 300 WHERE id = 1;", "sql" ); - session2.commitTransaction(); + session1.commitTransactionAsync(); + session2.commitTransactionAsync(); session1.awaitCompletion(); session2.awaitCompletion(); Session validator = new Session( testHelper ); validator.startTransaction(); - Future> futureValidation = validator.executeStatement( + Future> futureValidation = validator.executeStatementAsync( "SELECT balance FROM accounts WHERE id = 1;", "sql" ); @@ -261,7 +261,7 @@ public void dirtyWriteSimple() throws InterruptedException, ExecutionException, assertEquals( 300, balance ); closeAndIgnore( validation ); - validator.commitTransaction(); + validator.commitTransactionAsync(); validator.awaitCompletion(); dropTables(); @@ -276,31 +276,31 @@ public void lostUpdateSimple() throws InterruptedException, ExecutionException, Session session2 = new Session( testHelper ); session1.startTransaction(); - Future> firstSelect = session1.executeStatement( + Future> firstSelect = session1.executeStatementAsync( "SELECT balance FROM accounts WHERE id = 1;", "sql" ); closeAndIgnore( firstSelect ); // done manually as this forces the first statement to be completed before session 2 starts session2.startTransaction(); - session2.executeStatementIgnoreResult( + session2.executeStatementIgnoreResultAsync( "UPDATE accounts SET balance = 200 WHERE id = 1;", "sql" ); - session2.commitTransaction(); + session2.commitTransactionAsync(); - session1.executeStatementIgnoreResult( + session1.executeStatementIgnoreResultAsync( "UPDATE accounts SET balance = 300 WHERE id = 1;", "sql" ); - session1.commitTransaction(); + session1.commitTransactionAsync(); session1.awaitCompletion(); session2.awaitCompletion(); Session validator = new Session( testHelper ); validator.startTransaction(); - Future> futureValidation = validator.executeStatement( + Future> futureValidation = validator.executeStatementAsync( "SELECT balance FROM accounts WHERE id = 1;", "sql" ); @@ -310,7 +310,7 @@ public void lostUpdateSimple() throws InterruptedException, ExecutionException, assertEquals( 200, balance ); closeAndIgnore( validation ); - validator.commitTransaction(); + validator.commitTransactionAsync(); validator.awaitCompletion(); dropTables(); @@ -325,7 +325,7 @@ public void readSkewSimple() throws ExecutionException, InterruptedException, Ti Session session2 = new Session( testHelper ); session1.startTransaction(); - Future> futureReadX = session1.executeStatement( + Future> futureReadX = session1.executeStatementAsync( "SELECT x FROM coordinates WHERE id = 1;", "sql" ); @@ -337,13 +337,13 @@ public void readSkewSimple() throws ExecutionException, InterruptedException, Ti closeAndIgnore( readX ); session2.startTransaction(); - session2.executeStatementIgnoreResult( + session2.executeStatementIgnoreResultAsync( "UPDATE coordinates SET x = 300, y = 400 WHERE id = 1;", "sql" ); - session2.commitTransaction(); + session2.commitTransactionAsync(); - Future> futureReadY = session1.executeStatement( + Future> futureReadY = session1.executeStatementAsync( "SELECT y FROM coordinates WHERE id = 1;", "sql" ); @@ -354,7 +354,7 @@ public void readSkewSimple() throws ExecutionException, InterruptedException, Ti assertEquals( 200, yCoordinate ); closeAndIgnore( readY ); - session1.commitTransaction(); + session1.commitTransactionAsync(); session1.awaitCompletion(); session2.awaitCompletion(); @@ -370,61 +370,73 @@ public void writeSkewSimple() throws ExecutionException, InterruptedException, T Session session1 = new Session( testHelper ); Session session2 = new Session( testHelper ); + // This class represents transaction logic required for this test. In presence of stored procedures those could be used instead. + class Checker { + + public void checkBalance1( List result ) { + int totalBalance = result.get( 0 ).getIterator().getIterator().next()[0].asInteger().intValue(); + if ( totalBalance < 0 ) { + session1.rollbackTransaction(); + } else { + session1.commitTransaction(); + } + } + + + public void checkBalance2( List result ) { + int totalBalance = result.get( 0 ).getIterator().getIterator().next()[0].asInteger().intValue(); + if ( totalBalance < 0 ) { + session2.rollbackTransaction(); + } else { + session2.commitTransaction(); + } + } + + } + Checker checker = new Checker(); + session1.startTransaction(); - Future> futureRead1 = session1.executeStatement( + Future> futureRead1 = session1.executeStatementAsync( "SELECT balance FROM accounts WHERE id = 1 OR id = 2;", "sql" ); - closeAndIgnore( futureRead1 ); session2.startTransaction(); - Future> futureRead2 = session2.executeStatement( + Future> futureRead2 = session2.executeStatementAsync( "SELECT balance FROM accounts WHERE id = 1 OR id = 2;", "sql" ); + + closeAndIgnore( futureRead1 ); closeAndIgnore( futureRead2 ); - session1.executeStatementIgnoreResult( + session1.executeStatementIgnoreResultAsync( "UPDATE accounts SET balance = balance - 200 WHERE id = 1;", "sql" ); - Future> futureCheck1 = session1.executeStatement( + session1.executeStatementAndProcessAsync( "SELECT SUM(balance) AS total_balance FROM accounts;", - "sql" + "sql", + checker::checkBalance1 + ); - session2.executeStatementIgnoreResult( + session2.executeStatementIgnoreResultAsync( "UPDATE accounts SET balance = balance - 200 WHERE id = 2;", "sql" ); - Future> futureCheck2 = session2.executeStatement( + session2.executeStatementAndProcessAsync( "SELECT SUM(balance) AS total_balance FROM accounts;", - "sql" + "sql", + checker::checkBalance2 ); - - List result1 = futureCheck1.get( 1, TimeUnit.HOURS ); - int totalBalance1 = result1.get( 0 ).getIterator().getIterator().next()[0].asInteger().intValue(); - if ( totalBalance1 < 0 ) { - session1.rollbackTransaction(); - } else { - session1.commitTransaction(); - } - - List result2 = futureCheck2.get( 1, TimeUnit.HOURS ); - int totalBalance2 = result2.get( 0 ).getIterator().getIterator().next()[0].asInteger().intValue(); - if ( totalBalance2 < 0 ) { - session2.rollbackTransaction(); - } else { - session2.commitTransaction(); - } - session1.awaitCompletion(); session2.awaitCompletion(); Session validator = new Session( testHelper ); validator.startTransaction(); - Future> futureValidation = validator.executeStatement( + Future> futureValidation = validator.executeStatementAsync( "SELECT SUM(balance) AS total_balance FROM accounts;", "sql" ); diff --git a/dbms/src/test/java/org/polypheny/db/transaction/Session.java b/dbms/src/test/java/org/polypheny/db/transaction/Session.java index 388efa8c65..86c3c6ce3a 100644 --- a/dbms/src/test/java/org/polypheny/db/transaction/Session.java +++ b/dbms/src/test/java/org/polypheny/db/transaction/Session.java @@ -5,6 +5,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import lombok.Getter; import org.polypheny.db.TestHelper; import org.polypheny.db.processing.ImplementationContext.ExecutedContext; @@ -29,14 +30,23 @@ boolean awaitCompletion() throws InterruptedException { } - Future> executeStatement( String query, String languageName ) { + Future> executeStatementAsync( String query, String languageName ) { if ( executorService.isShutdown() || executorService.isTerminated() ) { executorService = Executors.newSingleThreadExecutor(); } return executorService.submit( () -> ConcurrencyTestUtils.executeStatement( query, languageName, transaction, testHelper ) ); } - void executeStatementIgnoreResult( String query, String languageName ) { + + void executeStatementAndProcessAsync( String query, String languageName, Consumer> consumer ) { + if ( executorService.isShutdown() || executorService.isTerminated() ) { + executorService = Executors.newSingleThreadExecutor(); + } + executorService.submit( () -> consumer.accept( ConcurrencyTestUtils.executeStatement( query, languageName, transaction, testHelper ) ) ); + } + + + void executeStatementIgnoreResultAsync( String query, String languageName ) { if ( executorService.isShutdown() || executorService.isTerminated() ) { executorService = Executors.newSingleThreadExecutor(); } @@ -49,13 +59,23 @@ void startTransaction() { } + void commitTransactionAsync() { + executorService.submit( () -> transaction.commit() ); + } + + + void rollbackTransactionAsync() { + executorService.submit( () -> transaction.rollback( "Requested by TestCase." ) ); + } + + void commitTransaction() { - executorService.submit(() -> transaction.commit()); + transaction.commit(); } void rollbackTransaction() { - executorService.submit(() -> transaction.rollback( "Requested by TestCase." )); + transaction.rollback( "Requested by TestCase." ); } }