Skip to content

Commit

Permalink
Backport to branch(3) : Split Group Commit Emitter method for normal …
Browse files Browse the repository at this point in the history
…group emit and delayed group emit (#1863)

Co-authored-by: Mitsunori Komatsu <[email protected]>
  • Loading branch information
feeblefakie and komamitsu authored Jun 7, 2024
1 parent 6448e6f commit 40457de
Show file tree
Hide file tree
Showing 27 changed files with 701 additions and 524 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import com.scalar.db.api.TransactionState;
import com.scalar.db.exception.transaction.CommitConflictException;
import com.scalar.db.exception.transaction.UnknownTransactionStatusException;
import com.scalar.db.transaction.consensuscommit.Coordinator.State;
import com.scalar.db.util.groupcommit.Emittable;
import com.scalar.db.util.groupcommit.GroupCommitConflictException;
import com.scalar.db.util.groupcommit.GroupCommitException;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
Expand All @@ -30,8 +32,8 @@ public CommitHandlerWithGroupCommit(
super(storage, coordinator, tableMetadataManager, parallelExecutor);

checkNotNull(groupCommitter);
// This method reference will be called via GroupCommitter.ready().
groupCommitter.setEmitter(this::groupCommitState);
// The methods of this emitter will be called via GroupCommitter.ready().
groupCommitter.setEmitter(new Emitter(coordinator));
this.groupCommitter = groupCommitter;
}

Expand Down Expand Up @@ -84,33 +86,50 @@ public void commitState(Snapshot snapshot)
commitStateViaGroupCommit(snapshot);
}

// TODO: Emitter interface should have `emitNormalGroup()` and `emitDelayedGroup()` separately and
// this method should be separated into `groupCommitStateWithParentId()` and
// `groupCommitStateWithFullId()` so whether the id is a parent ID or a full ID is clear.
private void groupCommitState(String parentIdOrFullId, List<Snapshot> snapshots)
throws CoordinatorException {
if (snapshots.isEmpty()) {
// This means all buffered transactions were manually rolled back. Nothing to do.
return;
@Override
public TransactionState abortState(String id) throws UnknownTransactionStatusException {
cancelGroupCommitIfNeeded(id);
return super.abortState(id);
}

private static class Emitter implements Emittable<String, String, Snapshot> {
private final Coordinator coordinator;

public Emitter(Coordinator coordinator) {
this.coordinator = coordinator;
}

List<String> transactionIds =
snapshots.stream().map(Snapshot::getId).collect(Collectors.toList());
@Override
public void emitNormalGroup(String parentId, List<Snapshot> snapshots)
throws CoordinatorException {
if (snapshots.isEmpty()) {
// This means all buffered transactions were manually rolled back. Nothing to do.
return;
}

// The id is either of a parent ID in the case of normal group commit or a full ID in the case
// of delayed commit.
coordinator.putStateForGroupCommit(
parentIdOrFullId, transactionIds, TransactionState.COMMITTED, System.currentTimeMillis());
// These transactions are contained in a normal group that has multiple transactions.
// Therefore, the transaction states should be put together in Coordinator.State.
List<String> transactionIds =
snapshots.stream().map(Snapshot::getId).collect(Collectors.toList());

logger.debug(
"Transaction {} is committed successfully at {}",
parentIdOrFullId,
System.currentTimeMillis());
}
coordinator.putStateForGroupCommit(
parentId, transactionIds, TransactionState.COMMITTED, System.currentTimeMillis());

@Override
public TransactionState abortState(String id) throws UnknownTransactionStatusException {
cancelGroupCommitIfNeeded(id);
return super.abortState(id);
logger.debug(
"Transaction {} (parent ID) is committed successfully at {}",
parentId,
System.currentTimeMillis());
}

@Override
public void emitDelayedGroup(String fullId, Snapshot snapshot) throws CoordinatorException {
// This transaction is contained in a delayed group that has only a single transaction.
// Therefore, the transaction state can be committed as if it's a normal commit (not a
// group commit).
coordinator.putState(new State(fullId, TransactionState.COMMITTED));

logger.debug(
"Transaction {} is committed successfully at {}", fullId, System.currentTimeMillis());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,30 +125,23 @@ public void putState(Coordinator.State state) throws CoordinatorException {
put(put);
}

// TODO: This method should be separated into `putStateForGroupCommitWithParentId()` and
// `putStateForGroupCommitWithFullId()` so whether the id is a parent ID or a full ID is
// clear.
void putStateForGroupCommit(
String parentIdOrFullId,
List<String> fullIds,
TransactionState transactionState,
long createdAt)
String parentId, List<String> fullIds, TransactionState transactionState, long createdAt)
throws CoordinatorException {
State state;
// `id` can be either of a parent ID for normal group commit or a full ID for delayed commit.
if (keyManipulator.isFullKey(parentIdOrFullId)) {
// Put the state with the full ID for a delayed group that contains only a single transaction.
state = new State(parentIdOrFullId, transactionState, createdAt);
} else {
// Put the state with the parent ID for a normal group that contains multiple transactions,
// with a parent ID as a key and multiple child IDs.
List<String> childIds = new ArrayList<>(fullIds.size());
for (String fullId : fullIds) {
Keys<String, String, String> keys = keyManipulator.keysFromFullKey(fullId);
childIds.add(keys.childKey);
}
state = new State(parentIdOrFullId, childIds, transactionState, createdAt);

if (keyManipulator.isFullKey(parentId)) {
throw new AssertionError(
"This method is only for normal group commits that use a parent ID as the key");
}

// Put the state that contains a parent ID as the key and multiple child transaction IDs.
List<String> childIds = new ArrayList<>(fullIds.size());
for (String fullId : fullIds) {
Keys<String, String, String> keys = keyManipulator.keysFromFullKey(fullId);
childIds.add(keys.childKey);
}
State state = new State(parentId, childIds, transactionState, createdAt);

Put put = createPutWith(state);
put(put);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import java.util.concurrent.ThreadLocalRandom;

public class CoordinatorGroupCommitter
extends GroupCommitter<String, String, String, String, Snapshot> {
extends GroupCommitter<String, String, String, String, String, Snapshot> {
CoordinatorGroupCommitter(GroupCommitConfig config) {
super("coordinator", config, new CoordinatorGroupCommitKeyManipulator());
}
Expand Down Expand Up @@ -36,7 +36,7 @@ public static boolean isEnabled(ConsensusCommitConfig config) {
}

static class CoordinatorGroupCommitKeyManipulator
implements KeyManipulator<String, String, String, String> {
implements KeyManipulator<String, String, String, String, String> {
private static final int PRIMARY_KEY_SIZE = 24;
private static final char DELIMITER = ':';
private static final int MAX_FULL_KEY_SIZE = 64;
Expand Down Expand Up @@ -113,15 +113,15 @@ public Keys<String, String, String> keysFromFullKey(String fullKey) {
}

@Override
public String emitKeyFromFullKey(String s) {
public String emitFullKeyFromFullKey(String fullKey) {
// Return the string as is since the value is already String.
return s;
return fullKey;
}

@Override
public String emitKeyFromParentKey(String s) {
public String emitParentKeyFromParentKey(String parentKey) {
// Return the string as is since the value is already String.
return s;
return parentKey;
}
}
}
22 changes: 11 additions & 11 deletions core/src/main/java/com/scalar/db/util/groupcommit/DelayedGroup.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,21 @@

import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
import java.util.Collections;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;

// A group for a delayed slot. This group contains only a single slot.
@ThreadSafe
class DelayedGroup<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_KEY, V>
extends Group<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_KEY, V> {
class DelayedGroup<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V>
extends Group<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V> {
private final FULL_KEY fullKey;

DelayedGroup(
GroupCommitConfig config,
FULL_KEY fullKey,
Emittable<EMIT_KEY, V> emitter,
KeyManipulator<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_KEY> keyManipulator) {
Emittable<EMIT_PARENT_KEY, EMIT_FULL_KEY, V> emitter,
KeyManipulator<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY>
keyManipulator) {
super(emitter, keyManipulator, 1, config.oldGroupAbortTimeoutMillis());
this.fullKey = fullKey;
}
Expand All @@ -35,13 +35,13 @@ FULL_KEY fullKey(CHILD_KEY childKey) {
// slot. But just in case.
protected synchronized void delegateEmitTaskToWaiter() {
assert slots.size() == 1;
for (Slot<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_KEY, V> slot : slots.values()) {
for (Slot<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V> slot :
slots.values()) {
// Pass `emitter` to ask the receiver's thread to emit the value
slot.delegateTaskToWaiter(
() ->
emitter.execute(
keyManipulator.emitKeyFromFullKey(fullKey),
Collections.singletonList(slot.value())));
emitter.emitDelayedGroup(
keyManipulator.emitFullKeyFromFullKey(fullKey), slot.value()));
// Return since the number of the slots is only 1.
return;
}
Expand All @@ -50,7 +50,7 @@ protected synchronized void delegateEmitTaskToWaiter() {
@Nullable
@Override
protected synchronized FULL_KEY reserveNewSlot(
Slot<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_KEY, V> slot) {
Slot<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V> slot) {
slot.changeParentGroupToDelayedGroup(this);
FULL_KEY fullKey = super.reserveNewSlot(slot);
if (fullKey == null) {
Expand All @@ -64,7 +64,7 @@ protected synchronized FULL_KEY reserveNewSlot(
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof DelayedGroup)) return false;
DelayedGroup<?, ?, ?, ?, ?> that = (DelayedGroup<?, ?, ?, ?, ?>) o;
DelayedGroup<?, ?, ?, ?, ?, ?> that = (DelayedGroup<?, ?, ?, ?, ?, ?>) o;
return Objects.equal(fullKey, that.fullKey);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,21 @@
// A worker manages NormalGroup instances to move delayed slots to a new DelayedGroup.
// Ready NormalGroup is passed to GroupCleanupWorker.
@ThreadSafe
class DelayedSlotMoveWorker<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_KEY, V>
extends BackgroundWorker<NormalGroup<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_KEY, V>> {
private final GroupManager<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_KEY, V> groupManager;
private final GroupCleanupWorker<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_KEY, V> groupCleanupWorker;
class DelayedSlotMoveWorker<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V>
extends BackgroundWorker<
NormalGroup<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V>> {
private final GroupManager<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V>
groupManager;
private final GroupCleanupWorker<
PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V>
groupCleanupWorker;

DelayedSlotMoveWorker(
String label,
long queueCheckIntervalInMillis,
GroupManager<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_KEY, V> groupManager,
GroupCleanupWorker<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_KEY, V> groupCleanupWorker) {
GroupManager<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V> groupManager,
GroupCleanupWorker<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V>
groupCleanupWorker) {
super(
label + "-group-commit-delayed-slot-move",
queueCheckIntervalInMillis,
Expand All @@ -29,15 +34,17 @@ class DelayedSlotMoveWorker<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_KEY, V>
}

@Override
BlockingQueue<NormalGroup<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_KEY, V>> createQueue() {
BlockingQueue<NormalGroup<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V>>
createQueue() {
// Use a priority queue to prioritize groups based on their timeout values, processing groups
// with smaller timeout values first.
return new PriorityBlockingQueue<>(
64, Comparator.comparingLong(NormalGroup::delayedSlotMoveTimeoutAtMillis));
}

@Override
boolean processItem(NormalGroup<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_KEY, V> normalGroup) {
boolean processItem(
NormalGroup<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V> normalGroup) {
if (normalGroup.isReady()) {
groupCleanupWorker.add(normalGroup);
// Already ready. Should remove the item.
Expand Down
10 changes: 6 additions & 4 deletions core/src/main/java/com/scalar/db/util/groupcommit/Emittable.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
/**
* An emittable interface to emit multiple values at once.
*
* @param <EMIT_KEY> A key type that Emitter can interpret.
* @param <FULL_KEY> A full-key type that Emitter can interpret.
* @param <PARENT_KEY> A parent-key type that Emitter can interpret.
* @param <V> A value type to be set to a slot.
*/
@FunctionalInterface
public interface Emittable<EMIT_KEY, V> {
void execute(EMIT_KEY key, List<V> values) throws Exception;
public interface Emittable<PARENT_KEY, FULL_KEY, V> {
void emitNormalGroup(PARENT_KEY parentKey, List<V> values) throws Exception;

void emitDelayedGroup(FULL_KEY fullKey, V value) throws Exception;
}
Loading

0 comments on commit 40457de

Please sign in to comment.