Skip to content

Commit

Permalink
Support implicit pre-read in Consensus Commit (#1222)
Browse files Browse the repository at this point in the history
  • Loading branch information
brfrn169 committed Nov 15, 2023
1 parent a31760d commit 4a400ad
Show file tree
Hide file tree
Showing 36 changed files with 1,622 additions and 637 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,9 @@ public void abort_forOngoingTransaction_ShouldAbortCorrectly() {}
@Disabled("JDBC transaction doesn't support rollback()")
@Override
public void rollback_forOngoingTransaction_ShouldRollbackCorrectly() {}

@Disabled
@Override
public void
putAndCommit_PutWithImplicitPreReadDisabledGivenForExisting_ShouldThrowCommitConflictException() {}
}
17 changes: 17 additions & 0 deletions core/src/main/java/com/scalar/db/api/OperationBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,23 @@ interface ClearValues<T> {
T clearValue(String columnName);
}

interface ImplicitPreReadEnabled<T> {
/**
* Disable implicit pre-read for this put operation.
*
* @return the operation builder
*/
T disableImplicitPreRead();

/**
* Sets whether implicit pre-read is enabled or not for this put operation.
*
* @param implicitPreReadEnabled whether implicit pre-read is enabled or not
* @return the operation builder
*/
T implicitPreReadEnabled(boolean implicitPreReadEnabled);
}

interface Limit<T> {
/**
* Sets the specified number of results to be returned
Expand Down
28 changes: 26 additions & 2 deletions core/src/main/java/com/scalar/db/api/Put.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public class Put extends Mutation {

private final Map<String, Column<?>> columns;

private boolean implicitPreReadEnabled = true;

/**
* Constructs a {@code Put} with the specified partition {@link Key}.
*
Expand Down Expand Up @@ -80,6 +82,7 @@ public Put(Key partitionKey, Key clusteringKey) {
public Put(Put put) {
super(put);
columns = new LinkedHashMap<>(put.columns);
implicitPreReadEnabled = put.implicitPreReadEnabled;
}

/**
Expand Down Expand Up @@ -705,6 +708,7 @@ private void checkIfExists(String name) {
throw new IllegalArgumentException(name + " doesn't exist");
}
}

/**
* @deprecated As of release 3.6.0. Will be removed in release 5.0.0. Use the setter method of the
* Put builder instead; to create a Put builder, use {@link Put#newBuilder()}
Expand Down Expand Up @@ -750,6 +754,25 @@ public Put withCondition(MutationCondition condition) {
return (Put) super.withCondition(condition);
}

/**
* Returns whether implicit pre-read is enabled for this Put.
*
* @return whether implicit pre-read is enabled for this Put
*/
public boolean isImplicitPreReadEnabled() {
return implicitPreReadEnabled;
}

/**
* Sets whether implicit pre-read is enabled for this Put.
*
* @param implicitPreReadEnabled whether the implicit pre-read is enabled for this Put
*/
Put setImplicitPreReadEnabled(boolean implicitPreReadEnabled) {
this.implicitPreReadEnabled = implicitPreReadEnabled;
return this;
}

/**
* Indicates whether some other object is "equal to" this object. The other object is considered
* equal if:
Expand All @@ -775,12 +798,12 @@ public boolean equals(Object o) {
return false;
}
Put other = (Put) o;
return columns.equals(other.columns);
return columns.equals(other.columns) && implicitPreReadEnabled == other.implicitPreReadEnabled;
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), columns);
return Objects.hash(super.hashCode(), columns, implicitPreReadEnabled);
}

@Override
Expand All @@ -793,6 +816,7 @@ public String toString() {
.add("columns", getColumns())
.add("consistency", getConsistency())
.add("condition", getCondition())
.add("implicitPreReadEnabled", isImplicitPreReadEnabled())
.toString();
}
}
31 changes: 30 additions & 1 deletion core/src/main/java/com/scalar/db/api/PutBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.scalar.db.api.OperationBuilder.ClusteringKey;
import com.scalar.db.api.OperationBuilder.Condition;
import com.scalar.db.api.OperationBuilder.Consistency;
import com.scalar.db.api.OperationBuilder.ImplicitPreReadEnabled;
import com.scalar.db.api.OperationBuilder.PartitionKeyBuilder;
import com.scalar.db.api.OperationBuilder.TableBuilder;
import com.scalar.db.api.OperationBuilder.Values;
Expand Down Expand Up @@ -76,11 +77,13 @@ public static class Buildable extends OperationBuilder.Buildable<Put>
implements ClusteringKey<Buildable>,
Consistency<Buildable>,
Condition<Buildable>,
Values<Buildable> {
Values<Buildable>,
ImplicitPreReadEnabled<Buildable> {
final Map<String, Column<?>> columns = new LinkedHashMap<>();
@Nullable Key clusteringKey;
@Nullable com.scalar.db.api.Consistency consistency;
@Nullable MutationCondition condition;
boolean implicitPreReadEnabled = true;

private Buildable(@Nullable String namespace, String table, Key partitionKey) {
super(namespace, table, partitionKey);
Expand Down Expand Up @@ -199,6 +202,18 @@ public Buildable value(Column<?> column) {
return this;
}

@Override
public Buildable disableImplicitPreRead() {
implicitPreReadEnabled = false;
return this;
}

@Override
public Buildable implicitPreReadEnabled(boolean implicitPreReadEnabled) {
this.implicitPreReadEnabled = implicitPreReadEnabled;
return this;
}

@Override
public Put build() {
Put put = new Put(partitionKey, clusteringKey);
Expand All @@ -210,6 +225,7 @@ public Put build() {
if (condition != null) {
put.withCondition(condition);
}
put.setImplicitPreReadEnabled(implicitPreReadEnabled);

return put;
}
Expand Down Expand Up @@ -237,6 +253,7 @@ public static class BuildableFromExisting extends Buildable
this.columns.putAll(put.getColumns());
this.consistency = put.getConsistency();
this.condition = put.getCondition().orElse(null);
this.implicitPreReadEnabled = put.isImplicitPreReadEnabled();
}

@Override
Expand Down Expand Up @@ -391,5 +408,17 @@ public BuildableFromExisting clearNamespace() {
this.namespaceName = null;
return this;
}

@Override
public Buildable disableImplicitPreRead() {
super.disableImplicitPreRead();
return this;
}

@Override
public Buildable implicitPreReadEnabled(boolean implicitPreReadEnabled) {
super.implicitPreReadEnabled(implicitPreReadEnabled);
return this;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
import com.scalar.db.api.Selection;
import com.scalar.db.common.AbstractDistributedTransaction;
import com.scalar.db.exception.storage.ExecutionException;
import com.scalar.db.exception.transaction.CommitConflictException;
import com.scalar.db.exception.transaction.CommitException;
import com.scalar.db.exception.transaction.CrudConflictException;
import com.scalar.db.exception.transaction.CrudException;
import com.scalar.db.exception.transaction.UnknownTransactionStatusException;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
Expand Down Expand Up @@ -94,7 +96,7 @@ public void put(Put put) throws CrudException {

@Override
public void put(List<Put> puts) throws CrudException {
checkArgument(puts.size() != 0);
checkArgument(!puts.isEmpty());
for (Put p : puts) {
put(p);
}
Expand All @@ -109,15 +111,15 @@ public void delete(Delete delete) throws CrudException {

@Override
public void delete(List<Delete> deletes) throws CrudException {
checkArgument(deletes.size() != 0);
checkArgument(!deletes.isEmpty());
for (Delete d : deletes) {
delete(d);
}
}

@Override
public void mutate(List<? extends Mutation> mutations) throws CrudException {
checkArgument(mutations.size() != 0);
checkArgument(!mutations.isEmpty());
for (Mutation m : mutations) {
if (m instanceof Put) {
put((Put) m);
Expand All @@ -129,6 +131,15 @@ public void mutate(List<? extends Mutation> mutations) throws CrudException {

@Override
public void commit() throws CommitException, UnknownTransactionStatusException {
// Execute implicit pre-read
try {
crud.readIfImplicitPreReadEnabled();
} catch (CrudConflictException e) {
throw new CommitConflictException("Conflict occurred while implicit pre-read", e, getId());
} catch (CrudException e) {
throw new CommitException("Failed to execute implicit pre-read", e, getId());
}

commit.commit(crud.getSnapshot());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ public class ConsensusCommitConfig {
public static final String ASYNC_COMMIT_ENABLED = PREFIX + "async_commit.enabled";
public static final String ASYNC_ROLLBACK_ENABLED = PREFIX + "async_rollback.enabled";

public static final String PARALLEL_IMPLICIT_PRE_READ =
PREFIX + "parallel_implicit_pre_read.enabled";

public static final int DEFAULT_PARALLEL_EXECUTOR_COUNT = 128;

public static final String INCLUDE_METADATA_ENABLED = PREFIX + "include_metadata.enabled";
Expand All @@ -48,6 +51,8 @@ public class ConsensusCommitConfig {

private final boolean isIncludeMetadataEnabled;

private final boolean parallelImplicitPreReadEnabled;

public ConsensusCommitConfig(DatabaseConfig databaseConfig) {
String transactionManager = databaseConfig.getTransactionManager();
if (!"consensus-commit".equals(transactionManager)) {
Expand Down Expand Up @@ -102,10 +107,16 @@ public ConsensusCommitConfig(DatabaseConfig databaseConfig) {
databaseConfig.getProperties(), PARALLEL_ROLLBACK_ENABLED, parallelCommitEnabled);

asyncCommitEnabled = getBoolean(databaseConfig.getProperties(), ASYNC_COMMIT_ENABLED, false);

// Use the value of async commit for async rollback as default value
asyncRollbackEnabled =
getBoolean(databaseConfig.getProperties(), ASYNC_ROLLBACK_ENABLED, asyncCommitEnabled);

isIncludeMetadataEnabled =
getBoolean(databaseConfig.getProperties(), INCLUDE_METADATA_ENABLED, false);

parallelImplicitPreReadEnabled =
getBoolean(databaseConfig.getProperties(), PARALLEL_IMPLICIT_PRE_READ, true);
}

public Isolation getIsolation() {
Expand Down Expand Up @@ -151,4 +162,8 @@ public boolean isAsyncRollbackEnabled() {
public boolean isIncludeMetadataEnabled() {
return isIncludeMetadataEnabled;
}

public boolean isParallelImplicitPreReadEnabled() {
return parallelImplicitPreReadEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,8 @@ DistributedTransaction begin(String txId, Isolation isolation, SerializableStrat
Snapshot snapshot =
new Snapshot(txId, isolation, strategy, tableMetadataManager, parallelExecutor);
CrudHandler crud =
new CrudHandler(storage, snapshot, tableMetadataManager, isIncludeMetadataEnabled);
new CrudHandler(
storage, snapshot, tableMetadataManager, isIncludeMetadataEnabled, parallelExecutor);
ConsensusCommit consensus =
new ConsensusCommit(crud, commit, recovery, mutationOperationChecker);
getNamespace().ifPresent(consensus::withNamespace);
Expand Down
Loading

0 comments on commit 4a400ad

Please sign in to comment.