Skip to content

Commit

Permalink
Adjust isolation test and bugfix
Browse files Browse the repository at this point in the history
  • Loading branch information
Tobias Hafner committed Dec 4, 2024
1 parent e0add45 commit 2d137a7
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.polypheny.db.TestHelper;
import org.polypheny.db.processing.ImplementationContext.ExecutedContext;

public class ConcurrencyTests {
public class IsolationTests {

private static TestHelper testHelper;

Expand Down Expand Up @@ -105,26 +105,26 @@ public void dirtyReadSimple() throws ExecutionException, InterruptedException, T
Session session2 = new Session( testHelper );

session1.startTransaction();
session1.executeStatementIgnoreResult(
session1.executeStatementIgnoreResultAsync(
"UPDATE accounts SET balance = 200 WHERE id = 1;",
"sql"
);

session2.startTransaction();
Future<List<ExecutedContext>> futureResult = session2.executeStatement(
Future<List<ExecutedContext>> futureResult = session2.executeStatementAsync(
"SELECT balance FROM accounts WHERE id = 1;",
"sql"
);

session1.rollbackTransaction();
session1.rollbackTransactionAsync();

List<ExecutedContext> results = futureResult.get( 1, TimeUnit.MINUTES );
assertEquals( 1, results.size() );
int balance = results.get( 0 ).getIterator().getIterator().next()[0].asInteger().intValue();
assertEquals( 100, balance );
closeAndIgnore( results );

session2.commitTransaction();
session2.commitTransactionAsync();

session1.awaitCompletion();
session2.awaitCompletion();
Expand All @@ -141,7 +141,7 @@ public void fuzzyReadSimple() throws ExecutionException, InterruptedException, T
Session session2 = new Session( testHelper );

session1.startTransaction();
Future<List<ExecutedContext>> futureFirstRead = session1.executeStatement(
Future<List<ExecutedContext>> futureFirstRead = session1.executeStatementAsync(
"SELECT balance FROM accounts WHERE id = 1;",
"sql"
);
Expand All @@ -153,13 +153,13 @@ public void fuzzyReadSimple() throws ExecutionException, InterruptedException, T
closeAndIgnore( firstRead );

session2.startTransaction();
session2.executeStatementIgnoreResult(
session2.executeStatementIgnoreResultAsync(
"UPDATE accounts SET balance = 300 WHERE id = 1;",
"sql"
);
session2.commitTransaction();
session2.commitTransactionAsync();

Future<List<ExecutedContext>> futureSecondRead = session1.executeStatement(
Future<List<ExecutedContext>> futureSecondRead = session1.executeStatementAsync(
"SELECT balance FROM accounts WHERE id = 1;",
"sql"
);
Expand All @@ -170,7 +170,7 @@ public void fuzzyReadSimple() throws ExecutionException, InterruptedException, T
assertEquals( 100, secondBalance );
closeAndIgnore( secondRead );

session1.commitTransaction();
session1.commitTransactionAsync();

session1.awaitCompletion();
session2.awaitCompletion();
Expand All @@ -187,7 +187,7 @@ public void phantomSimple() throws ExecutionException, InterruptedException, Tim
Session session2 = new Session( testHelper );

session1.startTransaction();
Future<List<ExecutedContext>> futureFirstRead = session1.executeStatement(
Future<List<ExecutedContext>> futureFirstRead = session1.executeStatementAsync(
"SELECT balance FROM accounts WHERE balance > 150;",
"sql"
);
Expand All @@ -198,13 +198,13 @@ public void phantomSimple() throws ExecutionException, InterruptedException, Tim
closeAndIgnore( firstRead );

session2.startTransaction();
session2.executeStatementIgnoreResult(
session2.executeStatementIgnoreResultAsync(
"INSERT INTO accounts (id, balance) VALUES (3, 200);",
"sql"
);
session2.commitTransaction();
session2.commitTransactionAsync();

Future<List<ExecutedContext>> futureSecondRead = session1.executeStatement(
Future<List<ExecutedContext>> futureSecondRead = session1.executeStatementAsync(
"SELECT balance FROM accounts WHERE balance > 150;",
"sql"
);
Expand All @@ -214,7 +214,7 @@ public void phantomSimple() throws ExecutionException, InterruptedException, Tim
assertFalse( secondRead.get( 0 ).getIterator().hasMoreRows() );
closeAndIgnore( secondRead );

session1.commitTransaction();
session1.commitTransactionAsync();

session1.awaitCompletion();
session2.awaitCompletion();
Expand All @@ -231,27 +231,27 @@ public void dirtyWriteSimple() throws InterruptedException, ExecutionException,
Session session2 = new Session( testHelper );

session1.startTransaction();
session1.executeStatementIgnoreResult(
Future<List<ExecutedContext>> futureFirst = session1.executeStatementAsync(
"UPDATE accounts SET balance = 250 WHERE id = 1;",
"sql"
);

session1.commitTransaction();
closeAndIgnore( futureFirst );

session2.startTransaction();
session2.executeStatementIgnoreResult(
session2.executeStatementIgnoreResultAsync(
"UPDATE accounts SET balance = 300 WHERE id = 1;",
"sql"
);

session2.commitTransaction();
session1.commitTransactionAsync();
session2.commitTransactionAsync();

session1.awaitCompletion();
session2.awaitCompletion();

Session validator = new Session( testHelper );
validator.startTransaction();
Future<List<ExecutedContext>> futureValidation = validator.executeStatement(
Future<List<ExecutedContext>> futureValidation = validator.executeStatementAsync(
"SELECT balance FROM accounts WHERE id = 1;",
"sql"
);
Expand All @@ -261,7 +261,7 @@ public void dirtyWriteSimple() throws InterruptedException, ExecutionException,
assertEquals( 300, balance );
closeAndIgnore( validation );

validator.commitTransaction();
validator.commitTransactionAsync();
validator.awaitCompletion();

dropTables();
Expand All @@ -276,31 +276,31 @@ public void lostUpdateSimple() throws InterruptedException, ExecutionException,
Session session2 = new Session( testHelper );

session1.startTransaction();
Future<List<ExecutedContext>> firstSelect = session1.executeStatement(
Future<List<ExecutedContext>> firstSelect = session1.executeStatementAsync(
"SELECT balance FROM accounts WHERE id = 1;",
"sql"
);
closeAndIgnore( firstSelect ); // done manually as this forces the first statement to be completed before session 2 starts

session2.startTransaction();
session2.executeStatementIgnoreResult(
session2.executeStatementIgnoreResultAsync(
"UPDATE accounts SET balance = 200 WHERE id = 1;",
"sql"
);
session2.commitTransaction();
session2.commitTransactionAsync();

session1.executeStatementIgnoreResult(
session1.executeStatementIgnoreResultAsync(
"UPDATE accounts SET balance = 300 WHERE id = 1;",
"sql"
);
session1.commitTransaction();
session1.commitTransactionAsync();

session1.awaitCompletion();
session2.awaitCompletion();

Session validator = new Session( testHelper );
validator.startTransaction();
Future<List<ExecutedContext>> futureValidation = validator.executeStatement(
Future<List<ExecutedContext>> futureValidation = validator.executeStatementAsync(
"SELECT balance FROM accounts WHERE id = 1;",
"sql"
);
Expand All @@ -310,7 +310,7 @@ public void lostUpdateSimple() throws InterruptedException, ExecutionException,
assertEquals( 200, balance );
closeAndIgnore( validation );

validator.commitTransaction();
validator.commitTransactionAsync();
validator.awaitCompletion();

dropTables();
Expand All @@ -325,7 +325,7 @@ public void readSkewSimple() throws ExecutionException, InterruptedException, Ti
Session session2 = new Session( testHelper );

session1.startTransaction();
Future<List<ExecutedContext>> futureReadX = session1.executeStatement(
Future<List<ExecutedContext>> futureReadX = session1.executeStatementAsync(
"SELECT x FROM coordinates WHERE id = 1;",
"sql"
);
Expand All @@ -337,13 +337,13 @@ public void readSkewSimple() throws ExecutionException, InterruptedException, Ti
closeAndIgnore( readX );

session2.startTransaction();
session2.executeStatementIgnoreResult(
session2.executeStatementIgnoreResultAsync(
"UPDATE coordinates SET x = 300, y = 400 WHERE id = 1;",
"sql"
);
session2.commitTransaction();
session2.commitTransactionAsync();

Future<List<ExecutedContext>> futureReadY = session1.executeStatement(
Future<List<ExecutedContext>> futureReadY = session1.executeStatementAsync(
"SELECT y FROM coordinates WHERE id = 1;",
"sql"
);
Expand All @@ -354,7 +354,7 @@ public void readSkewSimple() throws ExecutionException, InterruptedException, Ti
assertEquals( 200, yCoordinate );
closeAndIgnore( readY );

session1.commitTransaction();
session1.commitTransactionAsync();

session1.awaitCompletion();
session2.awaitCompletion();
Expand All @@ -370,61 +370,73 @@ public void writeSkewSimple() throws ExecutionException, InterruptedException, T
Session session1 = new Session( testHelper );
Session session2 = new Session( testHelper );

// This class represents transaction logic required for this test. In presence of stored procedures those could be used instead.
class Checker {

public void checkBalance1( List<ExecutedContext> result ) {
int totalBalance = result.get( 0 ).getIterator().getIterator().next()[0].asInteger().intValue();
if ( totalBalance < 0 ) {
session1.rollbackTransaction();
} else {
session1.commitTransaction();
}
}


public void checkBalance2( List<ExecutedContext> result ) {
int totalBalance = result.get( 0 ).getIterator().getIterator().next()[0].asInteger().intValue();
if ( totalBalance < 0 ) {
session2.rollbackTransaction();
} else {
session2.commitTransaction();
}
}

}
Checker checker = new Checker();

session1.startTransaction();
Future<List<ExecutedContext>> futureRead1 = session1.executeStatement(
Future<List<ExecutedContext>> futureRead1 = session1.executeStatementAsync(
"SELECT balance FROM accounts WHERE id = 1 OR id = 2;",
"sql"
);
closeAndIgnore( futureRead1 );

session2.startTransaction();
Future<List<ExecutedContext>> futureRead2 = session2.executeStatement(
Future<List<ExecutedContext>> futureRead2 = session2.executeStatementAsync(
"SELECT balance FROM accounts WHERE id = 1 OR id = 2;",
"sql"
);

closeAndIgnore( futureRead1 );
closeAndIgnore( futureRead2 );

session1.executeStatementIgnoreResult(
session1.executeStatementIgnoreResultAsync(
"UPDATE accounts SET balance = balance - 200 WHERE id = 1;",
"sql"
);
Future<List<ExecutedContext>> futureCheck1 = session1.executeStatement(
session1.executeStatementAndProcessAsync(
"SELECT SUM(balance) AS total_balance FROM accounts;",
"sql"
"sql",
checker::checkBalance1

);

session2.executeStatementIgnoreResult(
session2.executeStatementIgnoreResultAsync(
"UPDATE accounts SET balance = balance - 200 WHERE id = 2;",
"sql"
);
Future<List<ExecutedContext>> futureCheck2 = session2.executeStatement(
session2.executeStatementAndProcessAsync(
"SELECT SUM(balance) AS total_balance FROM accounts;",
"sql"
"sql",
checker::checkBalance2
);


List<ExecutedContext> result1 = futureCheck1.get( 1, TimeUnit.HOURS );
int totalBalance1 = result1.get( 0 ).getIterator().getIterator().next()[0].asInteger().intValue();
if ( totalBalance1 < 0 ) {
session1.rollbackTransaction();
} else {
session1.commitTransaction();
}

List<ExecutedContext> result2 = futureCheck2.get( 1, TimeUnit.HOURS );
int totalBalance2 = result2.get( 0 ).getIterator().getIterator().next()[0].asInteger().intValue();
if ( totalBalance2 < 0 ) {
session2.rollbackTransaction();
} else {
session2.commitTransaction();
}

session1.awaitCompletion();
session2.awaitCompletion();

Session validator = new Session( testHelper );
validator.startTransaction();
Future<List<ExecutedContext>> futureValidation = validator.executeStatement(
Future<List<ExecutedContext>> futureValidation = validator.executeStatementAsync(
"SELECT SUM(balance) AS total_balance FROM accounts;",
"sql"
);
Expand Down
28 changes: 24 additions & 4 deletions dbms/src/test/java/org/polypheny/db/transaction/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import lombok.Getter;
import org.polypheny.db.TestHelper;
import org.polypheny.db.processing.ImplementationContext.ExecutedContext;
Expand All @@ -29,14 +30,23 @@ boolean awaitCompletion() throws InterruptedException {
}


Future<List<ExecutedContext>> executeStatement( String query, String languageName ) {
Future<List<ExecutedContext>> executeStatementAsync( String query, String languageName ) {
if ( executorService.isShutdown() || executorService.isTerminated() ) {
executorService = Executors.newSingleThreadExecutor();
}
return executorService.submit( () -> ConcurrencyTestUtils.executeStatement( query, languageName, transaction, testHelper ) );
}

void executeStatementIgnoreResult( String query, String languageName ) {

void executeStatementAndProcessAsync( String query, String languageName, Consumer<List<ExecutedContext>> consumer ) {
if ( executorService.isShutdown() || executorService.isTerminated() ) {
executorService = Executors.newSingleThreadExecutor();
}
executorService.submit( () -> consumer.accept( ConcurrencyTestUtils.executeStatement( query, languageName, transaction, testHelper ) ) );
}


void executeStatementIgnoreResultAsync( String query, String languageName ) {
if ( executorService.isShutdown() || executorService.isTerminated() ) {
executorService = Executors.newSingleThreadExecutor();
}
Expand All @@ -49,13 +59,23 @@ void startTransaction() {
}


void commitTransactionAsync() {
executorService.submit( () -> transaction.commit() );
}


void rollbackTransactionAsync() {
executorService.submit( () -> transaction.rollback( "Requested by TestCase." ) );
}


void commitTransaction() {
executorService.submit(() -> transaction.commit());
transaction.commit();
}


void rollbackTransaction() {
executorService.submit(() -> transaction.rollback( "Requested by TestCase." ));
transaction.rollback( "Requested by TestCase." );
}

}

0 comments on commit 2d137a7

Please sign in to comment.