Skip to content

Commit

Permalink
Backport to branch(3) : Prohibit the group commit feature with 2PC in…
Browse files Browse the repository at this point in the history
…terface (#1862)

Co-authored-by: Mitsunori Komatsu <[email protected]>
Co-authored-by: Josh Wong <[email protected]>
Co-authored-by: Toshihiro Suzuki <[email protected]>
  • Loading branch information
4 people authored Jun 7, 2024
1 parent 4b89f7c commit 6448e6f
Show file tree
Hide file tree
Showing 10 changed files with 62 additions and 243 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1066,7 +1066,7 @@ jobs:
- name: Setup and execute Gradle 'integrationTestJdbc' task
uses: gradle/gradle-build-action@v3
with:
arguments: integrationTestJdbc -PjavaVersion=${{ env.JAVA_VERSION }} -PjavaVendor=${{ env.JAVA_VENDOR }} -PintegrationTestJavaRuntimeVersion=${{ env.INT_TEST_JAVA_RUNTIME_VERSION }} -PintegrationTestJavaRuntimeVendor=${{ env.INT_TEST_JAVA_RUNTIME_VENDOR }} -Dscalardb.jdbc.url=jdbc:postgresql://localhost:5432/ -Dscalardb.jdbc.username=postgres -Dscalardb.jdbc.password=postgres -Dscalar.db.consensus_commit.coordinator.group_commit.enabled=true -Dscalar.db.consensus_commit.coordinator.group_commit.old_group_abort_timeout_millis=15000 --tests "**ConsensusCommit**"
arguments: integrationTestJdbc -PjavaVersion=${{ env.JAVA_VERSION }} -PjavaVendor=${{ env.JAVA_VENDOR }} -PintegrationTestJavaRuntimeVersion=${{ env.INT_TEST_JAVA_RUNTIME_VERSION }} -PintegrationTestJavaRuntimeVendor=${{ env.INT_TEST_JAVA_RUNTIME_VENDOR }} -Dscalardb.jdbc.url=jdbc:postgresql://localhost:5432/ -Dscalardb.jdbc.username=postgres -Dscalardb.jdbc.password=postgres -Dscalar.db.consensus_commit.coordinator.group_commit.enabled=true -Dscalar.db.consensus_commit.coordinator.group_commit.old_group_abort_timeout_millis=15000 --tests "**.ConsensusCommit**"

- name: Upload Gradle test reports
if: always()
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/java/com/scalar/db/common/error/CoreError.java
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,12 @@ public enum CoreError implements ScalarDbError {
"Resuming a transaction is not allowed in single CRUD operation transactions",
"",
""),
CONSENSUS_COMMIT_GROUP_COMMIT_WITH_TWO_PHASE_COMMIT_INTERFACE_NOT_ALLOWED(
Category.USER_ERROR,
"0141",
"Using the group commit feature on the Coordinator table with a two-phase commit interface is not allowed",
"",
""),

//
// Errors for the concurrency error category
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,17 @@ public CoordinatorGroupCommitter(ConsensusCommitConfig config) {
}

public static Optional<CoordinatorGroupCommitter> from(ConsensusCommitConfig config) {
if (config.isCoordinatorGroupCommitEnabled()) {
if (isEnabled(config)) {
return Optional.of(new CoordinatorGroupCommitter(config));
} else {
return Optional.empty();
}
}

public static boolean isEnabled(ConsensusCommitConfig config) {
return config.isCoordinatorGroupCommitEnabled();
}

static class CoordinatorGroupCommitKeyManipulator
implements KeyManipulator<String, String, String, String> {
private static final int PRIMARY_KEY_SIZE = 24;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ public class TwoPhaseConsensusCommit extends AbstractTwoPhaseCommitTransaction {
private final ConsensusCommitMutationOperationChecker mutationOperationChecker;
private boolean validated;
private boolean needRollback;
// Whether to write to the coordinator table.
@VisibleForTesting final boolean shouldManageState;

// For test
private Runnable beforeRecoveryHook = () -> {};
Expand All @@ -54,13 +52,11 @@ public TwoPhaseConsensusCommit(
CrudHandler crud,
CommitHandler commit,
RecoveryHandler recovery,
ConsensusCommitMutationOperationChecker mutationOperationChecker,
boolean shouldManageState) {
ConsensusCommitMutationOperationChecker mutationOperationChecker) {
this.crud = crud;
this.commit = commit;
this.recovery = recovery;
this.mutationOperationChecker = mutationOperationChecker;
this.shouldManageState = shouldManageState;
}

@Override
Expand Down Expand Up @@ -247,9 +243,7 @@ public void commit() throws CommitConflictException, UnknownTransactionStatusExc
}

try {
if (shouldManageState) {
commit.commitState(crud.getSnapshot());
}
commit.commitState(crud.getSnapshot());
} catch (CommitConflictException | UnknownTransactionStatusException e) {
// no need to rollback because the transaction has already been rolled back
needRollback = false;
Expand All @@ -267,14 +261,12 @@ public void rollback() throws RollbackException {
}

try {
if (shouldManageState) {
TransactionState state = commit.abortState(crud.getSnapshot().getId());
if (state == TransactionState.COMMITTED) {
throw new RollbackException(
CoreError.CONSENSUS_COMMIT_ROLLBACK_FAILED_BECAUSE_TRANSACTION_ALREADY_COMMITTED
.buildMessage(),
getId());
}
TransactionState state = commit.abortState(crud.getSnapshot().getId());
if (state == TransactionState.COMMITTED) {
throw new RollbackException(
CoreError.CONSENSUS_COMMIT_ROLLBACK_FAILED_BECAUSE_TRANSACTION_ALREADY_COMMITTED
.buildMessage(),
getId());
}
} catch (UnknownTransactionStatusException e) {
throw new RollbackException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.scalar.db.api.TransactionState;
import com.scalar.db.api.TwoPhaseCommitTransaction;
import com.scalar.db.common.ActiveTransactionManagedTwoPhaseCommitTransactionManager;
import com.scalar.db.common.error.CoreError;
import com.scalar.db.config.DatabaseConfig;
import com.scalar.db.exception.transaction.TransactionException;
import com.scalar.db.exception.transaction.UnknownTransactionStatusException;
Expand All @@ -34,7 +35,6 @@ public class TwoPhaseConsensusCommitManager
private final CommitHandler commit;
private final boolean isIncludeMetadataEnabled;
private final ConsensusCommitMutationOperationChecker mutationOperationChecker;
private final CoordinatorGroupCommitter groupCommitter;

@SuppressFBWarnings("EI_EXPOSE_REP2")
@Inject
Expand All @@ -44,14 +44,14 @@ public TwoPhaseConsensusCommitManager(
this.storage = storage;
this.admin = admin;
config = new ConsensusCommitConfig(databaseConfig);
throwIfGroupCommitIsEnabled(config);
tableMetadataManager =
new TransactionTableMetadataManager(
admin, databaseConfig.getMetadataCacheExpirationTimeSecs());
coordinator = new Coordinator(storage, config);
parallelExecutor = new ParallelExecutor(config);
recovery = new RecoveryHandler(storage, coordinator, tableMetadataManager);
groupCommitter = CoordinatorGroupCommitter.from(config).orElse(null);
commit = createCommitHandler();
commit = new CommitHandler(storage, coordinator, tableMetadataManager, parallelExecutor);
isIncludeMetadataEnabled = config.isIncludeMetadataEnabled();
mutationOperationChecker = new ConsensusCommitMutationOperationChecker(tableMetadataManager);
}
Expand All @@ -63,14 +63,14 @@ public TwoPhaseConsensusCommitManager(DatabaseConfig databaseConfig) {
admin = storageFactory.getStorageAdmin();

config = new ConsensusCommitConfig(databaseConfig);
throwIfGroupCommitIsEnabled(config);
tableMetadataManager =
new TransactionTableMetadataManager(
admin, databaseConfig.getMetadataCacheExpirationTimeSecs());
coordinator = new Coordinator(storage, config);
parallelExecutor = new ParallelExecutor(config);
recovery = new RecoveryHandler(storage, coordinator, tableMetadataManager);
groupCommitter = CoordinatorGroupCommitter.from(config).orElse(null);
commit = createCommitHandler();
commit = new CommitHandler(storage, coordinator, tableMetadataManager, parallelExecutor);
isIncludeMetadataEnabled = config.isIncludeMetadataEnabled();
mutationOperationChecker = new ConsensusCommitMutationOperationChecker(tableMetadataManager);
}
Expand All @@ -85,31 +85,28 @@ public TwoPhaseConsensusCommitManager(DatabaseConfig databaseConfig) {
Coordinator coordinator,
ParallelExecutor parallelExecutor,
RecoveryHandler recovery,
CommitHandler commit,
CoordinatorGroupCommitter groupCommitter) {
CommitHandler commit) {
super(databaseConfig);
this.storage = storage;
this.admin = admin;
this.config = config;
throwIfGroupCommitIsEnabled(config);
tableMetadataManager =
new TransactionTableMetadataManager(
admin, databaseConfig.getMetadataCacheExpirationTimeSecs());
this.coordinator = coordinator;
this.parallelExecutor = parallelExecutor;
this.recovery = recovery;
this.commit = commit;
this.groupCommitter = groupCommitter;
isIncludeMetadataEnabled = config.isIncludeMetadataEnabled();
mutationOperationChecker = new ConsensusCommitMutationOperationChecker(tableMetadataManager);
}

// `groupCommitter` must be set before calling this method.
private CommitHandler createCommitHandler() {
if (isGroupCommitEnabled()) {
return new CommitHandlerWithGroupCommit(
storage, coordinator, tableMetadataManager, parallelExecutor, groupCommitter);
} else {
return new CommitHandler(storage, coordinator, tableMetadataManager, parallelExecutor);
private void throwIfGroupCommitIsEnabled(ConsensusCommitConfig config) {
if (CoordinatorGroupCommitter.isEnabled(config)) {
throw new IllegalArgumentException(
CoreError.CONSENSUS_COMMIT_GROUP_COMMIT_WITH_TWO_PHASE_COMMIT_INTERFACE_NOT_ALLOWED
.buildMessage());
}
}

Expand All @@ -135,12 +132,7 @@ TwoPhaseCommitTransaction begin(Isolation isolation, SerializableStrategy strate
@VisibleForTesting
TwoPhaseCommitTransaction begin(String txId, Isolation isolation, SerializableStrategy strategy)
throws TransactionException {
if (isGroupCommitEnabled()) {
txId = groupCommitter.reserve(txId);
}
// The coordinator service always commits coordinator states regardless of whether the group
// commit feature is enabled.
return createNewTransaction(txId, isolation, strategy, true);
return createNewTransaction(txId, isolation, strategy);
}

@Override
Expand All @@ -157,34 +149,19 @@ TwoPhaseCommitTransaction join(String txId, Isolation isolation, SerializableStr
return resume(txId);
}

// Participant services don't use the group commit feature even if it's enabled. They simply use
// the passed transaction ID that is managed by the coordinator service, which utilizes the
// group commit feature.
// Also, participant services don't commit coordinator states if the group commit feature is
// enabled.
return createNewTransaction(txId, isolation, strategy, !isGroupCommitEnabled());
return createNewTransaction(txId, isolation, strategy);
}

private TwoPhaseCommitTransaction createNewTransaction(
String txId, Isolation isolation, SerializableStrategy strategy, boolean isCoordinator)
throws TransactionException {
String txId, Isolation isolation, SerializableStrategy strategy) throws TransactionException {
Snapshot snapshot =
new Snapshot(txId, isolation, strategy, tableMetadataManager, parallelExecutor);
CrudHandler crud =
new CrudHandler(
storage, snapshot, tableMetadataManager, isIncludeMetadataEnabled, parallelExecutor);

// If the group commit feature is enabled, only the coordinator service must manage the
// coordinator table state of transactions. With the group commit feature enabled, transactions
// are grouped and managed in memory on a node based on various events (e.g., timeouts). It's
// highly likely that the coordinator and participants in the two-phase commit interface will
// group and manage transactions differently, resulting in attempts to store different
// transaction groups in the coordinator table. Therefore, TwoPhaseConsensusCommit must not
// commit or abort states if it's a participant when the group commit feature is enabled.
boolean shouldManageCoordinatorState = isCoordinator || !isGroupCommitEnabled();
TwoPhaseConsensusCommit transaction =
new TwoPhaseConsensusCommit(
crud, commit, recovery, mutationOperationChecker, shouldManageCoordinatorState);
new TwoPhaseConsensusCommit(crud, commit, recovery, mutationOperationChecker);
getNamespace().ifPresent(transaction::withNamespace);
getTable().ifPresent(transaction::withTable);
return decorate(transaction);
Expand Down Expand Up @@ -215,17 +192,10 @@ public TransactionState rollback(String txId) {
}
}

private boolean isGroupCommitEnabled() {
return groupCommitter != null;
}

@Override
public void close() {
storage.close();
admin.close();
parallelExecutor.close();
if (isGroupCommitEnabled()) {
groupCommitter.close();
}
}
}
Loading

0 comments on commit 6448e6f

Please sign in to comment.