From 40457def0eb97bc56e8b7ae13745d5430abae0ac Mon Sep 17 00:00:00 2001 From: Hiroyuki Yamada Date: Fri, 7 Jun 2024 18:18:04 +0900 Subject: [PATCH] Backport to branch(3) : Split Group Commit Emitter method for normal group emit and delayed group emit (#1863) Co-authored-by: Mitsunori Komatsu --- .../CommitHandlerWithGroupCommit.java | 69 ++++--- .../consensuscommit/Coordinator.java | 35 ++-- .../CoordinatorGroupCommitter.java | 12 +- .../db/util/groupcommit/DelayedGroup.java | 22 +-- .../groupcommit/DelayedSlotMoveWorker.java | 23 ++- .../scalar/db/util/groupcommit/Emittable.java | 10 +- .../com/scalar/db/util/groupcommit/Group.java | 47 +++-- .../util/groupcommit/GroupCleanupWorker.java | 17 +- .../util/groupcommit/GroupCommitMetrics.java | 1 - .../db/util/groupcommit/GroupCommitter.java | 82 +++++--- .../db/util/groupcommit/GroupManager.java | 78 +++++--- .../util/groupcommit/GroupSizeFixWorker.java | 26 ++- .../db/util/groupcommit/KeyManipulator.java | 9 +- .../db/util/groupcommit/NormalGroup.java | 41 ++-- .../com/scalar/db/util/groupcommit/Slot.java | 14 +- .../CoordinatorGroupCommitterTest.java | 35 ++-- .../consensuscommit/CoordinatorTest.java | 14 +- .../db/util/groupcommit/DelayedGroupTest.java | 108 +++++++---- .../DelayedSlotMoveWorkerTest.java | 21 ++- .../groupcommit/GroupCleanupWorkerTest.java | 16 +- .../GroupCommitterConcurrentTest.java | 58 +++--- .../util/groupcommit/GroupCommitterTest.java | 173 ++++++++++------- .../db/util/groupcommit/GroupManagerTest.java | 103 ++++++----- .../groupcommit/GroupSizeFixWorkerTest.java | 21 ++- .../db/util/groupcommit/NormalGroupTest.java | 175 ++++++++---------- .../scalar/db/util/groupcommit/SlotTest.java | 9 +- .../groupcommit/TestableKeyManipulator.java | 6 +- 27 files changed, 701 insertions(+), 524 deletions(-) diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommit.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommit.java index 5e210dabb1..addca7809b 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommit.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommit.java @@ -6,6 +6,8 @@ import com.scalar.db.api.TransactionState; import com.scalar.db.exception.transaction.CommitConflictException; import com.scalar.db.exception.transaction.UnknownTransactionStatusException; +import com.scalar.db.transaction.consensuscommit.Coordinator.State; +import com.scalar.db.util.groupcommit.Emittable; import com.scalar.db.util.groupcommit.GroupCommitConflictException; import com.scalar.db.util.groupcommit.GroupCommitException; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; @@ -30,8 +32,8 @@ public CommitHandlerWithGroupCommit( super(storage, coordinator, tableMetadataManager, parallelExecutor); checkNotNull(groupCommitter); - // This method reference will be called via GroupCommitter.ready(). - groupCommitter.setEmitter(this::groupCommitState); + // The methods of this emitter will be called via GroupCommitter.ready(). + groupCommitter.setEmitter(new Emitter(coordinator)); this.groupCommitter = groupCommitter; } @@ -84,33 +86,50 @@ public void commitState(Snapshot snapshot) commitStateViaGroupCommit(snapshot); } - // TODO: Emitter interface should have `emitNormalGroup()` and `emitDelayedGroup()` separately and - // this method should be separated into `groupCommitStateWithParentId()` and - // `groupCommitStateWithFullId()` so whether the id is a parent ID or a full ID is clear. - private void groupCommitState(String parentIdOrFullId, List snapshots) - throws CoordinatorException { - if (snapshots.isEmpty()) { - // This means all buffered transactions were manually rolled back. Nothing to do. - return; + @Override + public TransactionState abortState(String id) throws UnknownTransactionStatusException { + cancelGroupCommitIfNeeded(id); + return super.abortState(id); + } + + private static class Emitter implements Emittable { + private final Coordinator coordinator; + + public Emitter(Coordinator coordinator) { + this.coordinator = coordinator; } - List transactionIds = - snapshots.stream().map(Snapshot::getId).collect(Collectors.toList()); + @Override + public void emitNormalGroup(String parentId, List snapshots) + throws CoordinatorException { + if (snapshots.isEmpty()) { + // This means all buffered transactions were manually rolled back. Nothing to do. + return; + } - // The id is either of a parent ID in the case of normal group commit or a full ID in the case - // of delayed commit. - coordinator.putStateForGroupCommit( - parentIdOrFullId, transactionIds, TransactionState.COMMITTED, System.currentTimeMillis()); + // These transactions are contained in a normal group that has multiple transactions. + // Therefore, the transaction states should be put together in Coordinator.State. + List transactionIds = + snapshots.stream().map(Snapshot::getId).collect(Collectors.toList()); - logger.debug( - "Transaction {} is committed successfully at {}", - parentIdOrFullId, - System.currentTimeMillis()); - } + coordinator.putStateForGroupCommit( + parentId, transactionIds, TransactionState.COMMITTED, System.currentTimeMillis()); - @Override - public TransactionState abortState(String id) throws UnknownTransactionStatusException { - cancelGroupCommitIfNeeded(id); - return super.abortState(id); + logger.debug( + "Transaction {} (parent ID) is committed successfully at {}", + parentId, + System.currentTimeMillis()); + } + + @Override + public void emitDelayedGroup(String fullId, Snapshot snapshot) throws CoordinatorException { + // This transaction is contained in a delayed group that has only a single transaction. + // Therefore, the transaction state can be committed as if it's a normal commit (not a + // group commit). + coordinator.putState(new State(fullId, TransactionState.COMMITTED)); + + logger.debug( + "Transaction {} is committed successfully at {}", fullId, System.currentTimeMillis()); + } } } diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/Coordinator.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/Coordinator.java index cdeefcdced..3cdfb7dd8d 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/Coordinator.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/Coordinator.java @@ -125,30 +125,23 @@ public void putState(Coordinator.State state) throws CoordinatorException { put(put); } - // TODO: This method should be separated into `putStateForGroupCommitWithParentId()` and - // `putStateForGroupCommitWithFullId()` so whether the id is a parent ID or a full ID is - // clear. void putStateForGroupCommit( - String parentIdOrFullId, - List fullIds, - TransactionState transactionState, - long createdAt) + String parentId, List fullIds, TransactionState transactionState, long createdAt) throws CoordinatorException { - State state; - // `id` can be either of a parent ID for normal group commit or a full ID for delayed commit. - if (keyManipulator.isFullKey(parentIdOrFullId)) { - // Put the state with the full ID for a delayed group that contains only a single transaction. - state = new State(parentIdOrFullId, transactionState, createdAt); - } else { - // Put the state with the parent ID for a normal group that contains multiple transactions, - // with a parent ID as a key and multiple child IDs. - List childIds = new ArrayList<>(fullIds.size()); - for (String fullId : fullIds) { - Keys keys = keyManipulator.keysFromFullKey(fullId); - childIds.add(keys.childKey); - } - state = new State(parentIdOrFullId, childIds, transactionState, createdAt); + + if (keyManipulator.isFullKey(parentId)) { + throw new AssertionError( + "This method is only for normal group commits that use a parent ID as the key"); } + + // Put the state that contains a parent ID as the key and multiple child transaction IDs. + List childIds = new ArrayList<>(fullIds.size()); + for (String fullId : fullIds) { + Keys keys = keyManipulator.keysFromFullKey(fullId); + childIds.add(keys.childKey); + } + State state = new State(parentId, childIds, transactionState, createdAt); + Put put = createPutWith(state); put(put); } diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CoordinatorGroupCommitter.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CoordinatorGroupCommitter.java index eedb7f5de9..f2783bd82c 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CoordinatorGroupCommitter.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CoordinatorGroupCommitter.java @@ -7,7 +7,7 @@ import java.util.concurrent.ThreadLocalRandom; public class CoordinatorGroupCommitter - extends GroupCommitter { + extends GroupCommitter { CoordinatorGroupCommitter(GroupCommitConfig config) { super("coordinator", config, new CoordinatorGroupCommitKeyManipulator()); } @@ -36,7 +36,7 @@ public static boolean isEnabled(ConsensusCommitConfig config) { } static class CoordinatorGroupCommitKeyManipulator - implements KeyManipulator { + implements KeyManipulator { private static final int PRIMARY_KEY_SIZE = 24; private static final char DELIMITER = ':'; private static final int MAX_FULL_KEY_SIZE = 64; @@ -113,15 +113,15 @@ public Keys keysFromFullKey(String fullKey) { } @Override - public String emitKeyFromFullKey(String s) { + public String emitFullKeyFromFullKey(String fullKey) { // Return the string as is since the value is already String. - return s; + return fullKey; } @Override - public String emitKeyFromParentKey(String s) { + public String emitParentKeyFromParentKey(String parentKey) { // Return the string as is since the value is already String. - return s; + return parentKey; } } } diff --git a/core/src/main/java/com/scalar/db/util/groupcommit/DelayedGroup.java b/core/src/main/java/com/scalar/db/util/groupcommit/DelayedGroup.java index 91cb8cd547..cf851ab6ad 100644 --- a/core/src/main/java/com/scalar/db/util/groupcommit/DelayedGroup.java +++ b/core/src/main/java/com/scalar/db/util/groupcommit/DelayedGroup.java @@ -2,21 +2,21 @@ import com.google.common.base.MoreObjects; import com.google.common.base.Objects; -import java.util.Collections; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; // A group for a delayed slot. This group contains only a single slot. @ThreadSafe -class DelayedGroup - extends Group { +class DelayedGroup + extends Group { private final FULL_KEY fullKey; DelayedGroup( GroupCommitConfig config, FULL_KEY fullKey, - Emittable emitter, - KeyManipulator keyManipulator) { + Emittable emitter, + KeyManipulator + keyManipulator) { super(emitter, keyManipulator, 1, config.oldGroupAbortTimeoutMillis()); this.fullKey = fullKey; } @@ -35,13 +35,13 @@ FULL_KEY fullKey(CHILD_KEY childKey) { // slot. But just in case. protected synchronized void delegateEmitTaskToWaiter() { assert slots.size() == 1; - for (Slot slot : slots.values()) { + for (Slot slot : + slots.values()) { // Pass `emitter` to ask the receiver's thread to emit the value slot.delegateTaskToWaiter( () -> - emitter.execute( - keyManipulator.emitKeyFromFullKey(fullKey), - Collections.singletonList(slot.value()))); + emitter.emitDelayedGroup( + keyManipulator.emitFullKeyFromFullKey(fullKey), slot.value())); // Return since the number of the slots is only 1. return; } @@ -50,7 +50,7 @@ protected synchronized void delegateEmitTaskToWaiter() { @Nullable @Override protected synchronized FULL_KEY reserveNewSlot( - Slot slot) { + Slot slot) { slot.changeParentGroupToDelayedGroup(this); FULL_KEY fullKey = super.reserveNewSlot(slot); if (fullKey == null) { @@ -64,7 +64,7 @@ protected synchronized FULL_KEY reserveNewSlot( public boolean equals(Object o) { if (this == o) return true; if (!(o instanceof DelayedGroup)) return false; - DelayedGroup that = (DelayedGroup) o; + DelayedGroup that = (DelayedGroup) o; return Objects.equal(fullKey, that.fullKey); } diff --git a/core/src/main/java/com/scalar/db/util/groupcommit/DelayedSlotMoveWorker.java b/core/src/main/java/com/scalar/db/util/groupcommit/DelayedSlotMoveWorker.java index b7f43e73cf..04cf2a3bd5 100644 --- a/core/src/main/java/com/scalar/db/util/groupcommit/DelayedSlotMoveWorker.java +++ b/core/src/main/java/com/scalar/db/util/groupcommit/DelayedSlotMoveWorker.java @@ -8,16 +8,21 @@ // A worker manages NormalGroup instances to move delayed slots to a new DelayedGroup. // Ready NormalGroup is passed to GroupCleanupWorker. @ThreadSafe -class DelayedSlotMoveWorker - extends BackgroundWorker> { - private final GroupManager groupManager; - private final GroupCleanupWorker groupCleanupWorker; +class DelayedSlotMoveWorker + extends BackgroundWorker< + NormalGroup> { + private final GroupManager + groupManager; + private final GroupCleanupWorker< + PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V> + groupCleanupWorker; DelayedSlotMoveWorker( String label, long queueCheckIntervalInMillis, - GroupManager groupManager, - GroupCleanupWorker groupCleanupWorker) { + GroupManager groupManager, + GroupCleanupWorker + groupCleanupWorker) { super( label + "-group-commit-delayed-slot-move", queueCheckIntervalInMillis, @@ -29,7 +34,8 @@ class DelayedSlotMoveWorker } @Override - BlockingQueue> createQueue() { + BlockingQueue> + createQueue() { // Use a priority queue to prioritize groups based on their timeout values, processing groups // with smaller timeout values first. return new PriorityBlockingQueue<>( @@ -37,7 +43,8 @@ BlockingQueue> createQ } @Override - boolean processItem(NormalGroup normalGroup) { + boolean processItem( + NormalGroup normalGroup) { if (normalGroup.isReady()) { groupCleanupWorker.add(normalGroup); // Already ready. Should remove the item. diff --git a/core/src/main/java/com/scalar/db/util/groupcommit/Emittable.java b/core/src/main/java/com/scalar/db/util/groupcommit/Emittable.java index adc49c9f0b..eb1e6cf93e 100644 --- a/core/src/main/java/com/scalar/db/util/groupcommit/Emittable.java +++ b/core/src/main/java/com/scalar/db/util/groupcommit/Emittable.java @@ -5,10 +5,12 @@ /** * An emittable interface to emit multiple values at once. * - * @param A key type that Emitter can interpret. + * @param A full-key type that Emitter can interpret. + * @param A parent-key type that Emitter can interpret. * @param A value type to be set to a slot. */ -@FunctionalInterface -public interface Emittable { - void execute(EMIT_KEY key, List values) throws Exception; +public interface Emittable { + void emitNormalGroup(PARENT_KEY parentKey, List values) throws Exception; + + void emitDelayedGroup(FULL_KEY fullKey, V value) throws Exception; } diff --git a/core/src/main/java/com/scalar/db/util/groupcommit/Group.java b/core/src/main/java/com/scalar/db/util/groupcommit/Group.java index 2ea7c86a82..9dee71daaf 100644 --- a/core/src/main/java/com/scalar/db/util/groupcommit/Group.java +++ b/core/src/main/java/com/scalar/db/util/groupcommit/Group.java @@ -10,14 +10,17 @@ // An abstract class that has logics and implementations to manage slots and trigger to emit it. @ThreadSafe -abstract class Group { +abstract class Group { private static final Logger logger = LoggerFactory.getLogger(Group.class); - protected final Emittable emitter; - protected final KeyManipulator keyManipulator; + protected final Emittable emitter; + protected final KeyManipulator + keyManipulator; private final int capacity; private final AtomicReference size = new AtomicReference<>(); - protected final Map> slots; + protected final Map< + CHILD_KEY, Slot> + slots; // Whether to reject a new value slot. protected final AtomicReference status = new AtomicReference<>(Status.OPEN); private final long oldGroupAbortTimeoutAtMillis; @@ -57,8 +60,9 @@ enum Status { } Group( - Emittable emitter, - KeyManipulator keyManipulator, + Emittable emitter, + KeyManipulator + keyManipulator, int capacity, long oldGroupAbortTimeoutMillis) { this.emitter = emitter; @@ -77,7 +81,7 @@ private boolean noMoreSlot() { // If it returns null, the Group is already size-fixed and a retry is needed. @Nullable protected synchronized FULL_KEY reserveNewSlot( - Slot slot) { + Slot slot) { if (isSizeFixed()) { return null; } @@ -88,8 +92,9 @@ protected synchronized FULL_KEY reserveNewSlot( return slot.fullKey(); } - private void reserveSlot(Slot slot) { - Slot oldSlot = slots.put(slot.key(), slot); + private void reserveSlot( + Slot slot) { + Slot oldSlot = slots.put(slot.key(), slot); if (oldSlot != null) { throw new AssertionError( String.format("An old slot exist unexpectedly. Slot: %s, Old slot: %s", slot, oldSlot)); @@ -100,8 +105,8 @@ private void reserveSlot(Slot slot // This sync is for moving timed-out value slot from a normal buf to a new delayed buf. // Returns null if the state of the group is changed (e.g., the slot is moved to another group). @Nullable - private synchronized Slot putValueToSlot( - CHILD_KEY childKey, V value) { + private synchronized Slot + putValueToSlot(CHILD_KEY childKey, V value) { if (isReady()) { logger.debug( "This group is already ready, but trying to put a value to the slot. Probably the slot is moved to a DelayedGroup. Retrying... Group: {}, Child key: {}", @@ -110,7 +115,8 @@ private synchronized Slot putValue return null; } - Slot slot = slots.get(childKey); + Slot slot = + slots.get(childKey); if (slot == null) { return null; } @@ -119,7 +125,7 @@ private synchronized Slot putValue } boolean putValueToSlotAndWait(CHILD_KEY childKey, V value) throws GroupCommitException { - Slot slot; + Slot slot; synchronized (this) { slot = putValueToSlot(childKey, value); if (slot == null) { @@ -178,7 +184,8 @@ synchronized void updateStatus() { if (newStatus == Status.SIZE_FIXED) { int readySlotCount = 0; - for (Slot slot : slots.values()) { + for (Slot slot : + slots.values()) { if (slot.isReady()) { readySlotCount++; } @@ -190,7 +197,8 @@ synchronized void updateStatus() { if (newStatus == Status.READY) { int doneSlotCount = 0; - for (Slot slot : slots.values()) { + for (Slot slot : + slots.values()) { if (slot.isDone()) { doneSlotCount++; } @@ -203,7 +211,8 @@ synchronized void updateStatus() { } synchronized boolean removeSlot(CHILD_KEY childKey) { - Slot slot = slots.get(childKey); + Slot slot = + slots.get(childKey); if (slot == null) { // Probably, the slot is already removed by the client or moved from NormalGroup to // DelayedGroup. @@ -221,7 +230,8 @@ synchronized boolean removeSlot(CHILD_KEY childKey) { return false; } - Slot removed = slots.remove(childKey); + Slot removed = + slots.remove(childKey); assert removed != null; if (size.get() != null && size.get() > 0) { @@ -234,7 +244,8 @@ synchronized boolean removeSlot(CHILD_KEY childKey) { } synchronized void abort() { - for (Slot slot : slots.values()) { + for (Slot slot : + slots.values()) { // Tell the clients that the slots are aborted. slot.markAsFailed( new GroupCommitException( diff --git a/core/src/main/java/com/scalar/db/util/groupcommit/GroupCleanupWorker.java b/core/src/main/java/com/scalar/db/util/groupcommit/GroupCleanupWorker.java index 97e423dcba..6736584101 100644 --- a/core/src/main/java/com/scalar/db/util/groupcommit/GroupCleanupWorker.java +++ b/core/src/main/java/com/scalar/db/util/groupcommit/GroupCleanupWorker.java @@ -8,15 +8,18 @@ // A worker manages Group instances to removes completed ones. @ThreadSafe -class GroupCleanupWorker - extends BackgroundWorker> { +class GroupCleanupWorker + extends BackgroundWorker< + Group> { private static final Logger logger = LoggerFactory.getLogger(GroupCleanupWorker.class); - private final GroupManager groupManager; + private final GroupManager + groupManager; GroupCleanupWorker( String label, long queueCheckIntervalInMillis, - GroupManager groupManager) { + GroupManager + groupManager) { super( label + "-group-commit-group-cleanup", queueCheckIntervalInMillis, @@ -26,7 +29,8 @@ class GroupCleanupWorker } @Override - BlockingQueue> createQueue() { + BlockingQueue> + createQueue() { // Use a normal queue because: // - The timeout of the queued groups is large since it's for "just in case" // - In most cases a queued group gets DONE before it's timed-out @@ -36,7 +40,8 @@ BlockingQueue> createQueue() } @Override - boolean processItem(Group group) { + boolean processItem( + Group group) { if (group.oldGroupAbortTimeoutAtMillis() < System.currentTimeMillis()) { groupManager.removeGroupFromMap(group); group.abort(); diff --git a/core/src/main/java/com/scalar/db/util/groupcommit/GroupCommitMetrics.java b/core/src/main/java/com/scalar/db/util/groupcommit/GroupCommitMetrics.java index 6b639499b2..f75694ac6c 100644 --- a/core/src/main/java/com/scalar/db/util/groupcommit/GroupCommitMetrics.java +++ b/core/src/main/java/com/scalar/db/util/groupcommit/GroupCommitMetrics.java @@ -3,7 +3,6 @@ import com.google.common.base.MoreObjects; import javax.annotation.concurrent.Immutable; -// TODO: Remove after introducing a proper metrics. @Immutable class GroupCommitMetrics { private final int queueLengthOfGroupCloseWorker; diff --git a/core/src/main/java/com/scalar/db/util/groupcommit/GroupCommitter.java b/core/src/main/java/com/scalar/db/util/groupcommit/GroupCommitter.java index 5dc4286221..a9b16d767f 100644 --- a/core/src/main/java/com/scalar/db/util/groupcommit/GroupCommitter.java +++ b/core/src/main/java/com/scalar/db/util/groupcommit/GroupCommitter.java @@ -22,26 +22,35 @@ * @param A key type to slot in NormalGroup which can contain a value ready to commit. * @param A key type to DelayedGroup which contains a single slot and is * singly-committed. - * @param A key type that Emitter can interpret. + * @param A parent-key type that Emitter can interpret. + * @param A full-key type that Emitter can interpret. * @param A value type to be set to a slot. */ @ThreadSafe -public class GroupCommitter implements Closeable { +public class GroupCommitter + implements Closeable { private static final Logger logger = LoggerFactory.getLogger(GroupCommitter.class); // Background workers - private final GroupSizeFixWorker groupSizeFixWorker; - private final DelayedSlotMoveWorker + private final GroupSizeFixWorker< + PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V> + groupSizeFixWorker; + private final DelayedSlotMoveWorker< + PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V> delayedSlotMoveWorker; - private final GroupCleanupWorker groupCleanupWorker; + private final GroupCleanupWorker< + PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V> + groupCleanupWorker; // Monitor @Nullable private final GroupCommitMonitor groupCommitMonitor; // This contains logics of how to treat keys. - private final KeyManipulator keyManipulator; + private final KeyManipulator + keyManipulator; - private final GroupManager groupManager; + private final GroupManager + groupManager; private final AtomicBoolean closing = new AtomicBoolean(); @@ -53,7 +62,8 @@ public class GroupCommitter implem public GroupCommitter( String label, GroupCommitConfig config, - KeyManipulator keyManipulator) { + KeyManipulator + keyManipulator) { logger.info("Starting GroupCommitter. Label: {}, Config: {}", label, config); this.keyManipulator = keyManipulator; this.groupManager = createGroupManager(config, keyManipulator); @@ -87,7 +97,7 @@ GroupCommitMetrics getMetrics() { * * @param emitter An emitter. */ - public void setEmitter(Emittable emitter) { + public void setEmitter(Emittable emitter) { groupManager.setEmitter(emitter); } @@ -130,7 +140,8 @@ public void ready(FULL_KEY fullKey, V value) throws GroupCommitException { Keys keys = keyManipulator.keysFromFullKey(fullKey); boolean failed = false; while (true) { - Group group = groupManager.getGroup(keys); + Group group = + groupManager.getGroup(keys); if (group.putValueToSlotAndWait(keys.childKey, value)) { return; } @@ -195,41 +206,52 @@ public void close() { } @VisibleForTesting - GroupManager createGroupManager( - GroupCommitConfig config, - KeyManipulator keyManipulator) { + GroupManager + createGroupManager( + GroupCommitConfig config, + KeyManipulator + keyManipulator) { return new GroupManager<>(config, keyManipulator); } @VisibleForTesting - GroupCleanupWorker createGroupCleanupWorker( - String label, - GroupCommitConfig config, - GroupManager groupManager) { - GroupCleanupWorker worker = + GroupCleanupWorker + createGroupCleanupWorker( + String label, + GroupCommitConfig config, + GroupManager + groupManager) { + GroupCleanupWorker worker = new GroupCleanupWorker<>(label, config.timeoutCheckIntervalMillis(), groupManager); groupManager.setGroupCleanupWorker(worker); return worker; } @VisibleForTesting - DelayedSlotMoveWorker createDelayedSlotMoveWorker( - String label, - GroupCommitConfig config, - GroupManager groupManager, - GroupCleanupWorker groupCleanupWorker) { + DelayedSlotMoveWorker + createDelayedSlotMoveWorker( + String label, + GroupCommitConfig config, + GroupManager + groupManager, + GroupCleanupWorker + groupCleanupWorker) { return new DelayedSlotMoveWorker<>( label, config.timeoutCheckIntervalMillis(), groupManager, groupCleanupWorker); } @VisibleForTesting - GroupSizeFixWorker createGroupSizeFixWorker( - String label, - GroupCommitConfig config, - GroupManager groupManager, - DelayedSlotMoveWorker delayedSlotMoveWorker, - GroupCleanupWorker groupCleanupWorker) { - GroupSizeFixWorker worker = + GroupSizeFixWorker + createGroupSizeFixWorker( + String label, + GroupCommitConfig config, + GroupManager + groupManager, + DelayedSlotMoveWorker + delayedSlotMoveWorker, + GroupCleanupWorker + groupCleanupWorker) { + GroupSizeFixWorker worker = new GroupSizeFixWorker<>( label, config.timeoutCheckIntervalMillis(), delayedSlotMoveWorker, groupCleanupWorker); groupManager.setGroupSizeFixWorker(worker); diff --git a/core/src/main/java/com/scalar/db/util/groupcommit/GroupManager.java b/core/src/main/java/com/scalar/db/util/groupcommit/GroupManager.java index 443d3c84e9..4663171db0 100644 --- a/core/src/main/java/com/scalar/db/util/groupcommit/GroupManager.java +++ b/core/src/main/java/com/scalar/db/util/groupcommit/GroupManager.java @@ -15,18 +15,24 @@ import org.slf4j.LoggerFactory; @ThreadSafe -class GroupManager { +class GroupManager { private static final Logger logger = LoggerFactory.getLogger(GroupManager.class); // Groups - @Nullable private NormalGroup currentGroup; + @Nullable + private NormalGroup + currentGroup; // Note: Using ConcurrentHashMap results in less performance. @VisibleForTesting - protected final Map> + protected final Map< + PARENT_KEY, + NormalGroup> normalGroupMap = new HashMap<>(); @VisibleForTesting - protected final Map> + protected final Map< + FULL_KEY, + DelayedGroup> delayedGroupMap = new HashMap<>(); // Only this class uses this type of lock since the class can be heavy hotspot and StampedLock has @@ -35,31 +41,37 @@ class GroupManager { // Background workers @LazyInit - private GroupSizeFixWorker groupSizeFixWorker; + private GroupSizeFixWorker + groupSizeFixWorker; @LazyInit - private GroupCleanupWorker groupCleanupWorker; + private GroupCleanupWorker + groupCleanupWorker; // Custom operations injected by the client - private final KeyManipulator keyManipulator; - @LazyInit private Emittable emitter; + private final KeyManipulator + keyManipulator; + @LazyInit private Emittable emitter; private final GroupCommitConfig config; GroupManager( GroupCommitConfig config, - KeyManipulator keyManipulator) { + KeyManipulator + keyManipulator) { this.keyManipulator = keyManipulator; this.config = config; } void setGroupSizeFixWorker( - GroupSizeFixWorker groupSizeFixWorker) { + GroupSizeFixWorker + groupSizeFixWorker) { this.groupSizeFixWorker = groupSizeFixWorker; } void setGroupCleanupWorker( - GroupCleanupWorker groupCleanupWorker) { + GroupCleanupWorker + groupCleanupWorker) { this.groupCleanupWorker = groupCleanupWorker; } @@ -83,7 +95,7 @@ FULL_KEY reserveNewSlot(CHILD_KEY childKey) { } // Gets the corresponding group associated with the given key. - Group getGroup( + Group getGroup( Keys keys) throws GroupCommitException { long stamp = lock.writeLock(); try { @@ -91,13 +103,13 @@ Group getGroup( // with the parent key in `normalGroupMap` would return the NormalGroup even if the target // slot is already moved from the NormalGroup to the DelayedGroup. So, checking // `delayedGroupMap` first is necessary. - DelayedGroup delayedGroup = - delayedGroupMap.get(keys.fullKey); + DelayedGroup + delayedGroup = delayedGroupMap.get(keys.fullKey); if (delayedGroup != null) { return delayedGroup; } - NormalGroup normalGroup = + NormalGroup normalGroup = normalGroupMap.get(keys.parentKey); if (normalGroup != null) { return normalGroup; @@ -111,17 +123,22 @@ Group getGroup( } // Remove the specified group from group map. - boolean removeGroupFromMap(Group group) { + boolean removeGroupFromMap( + Group group) { long stamp = lock.writeLock(); try { if (group instanceof NormalGroup) { - NormalGroup normalGroup = - (NormalGroup) group; + NormalGroup + normalGroup = + (NormalGroup) + group; return normalGroupMap.remove(normalGroup.parentKey()) != null; } else { assert group instanceof DelayedGroup; - DelayedGroup delayedGroup = - (DelayedGroup) group; + DelayedGroup + delayedGroup = + (DelayedGroup) + group; return delayedGroupMap.remove(delayedGroup.fullKey()) != null; } } finally { @@ -135,13 +152,13 @@ boolean removeSlotFromGroup(Keys keys) { try { boolean removed = false; - DelayedGroup delayedGroup = - delayedGroupMap.get(keys.fullKey); + DelayedGroup + delayedGroup = delayedGroupMap.get(keys.fullKey); if (delayedGroup != null) { removed = delayedGroup.removeSlot(keys.childKey); } - NormalGroup normalGroup = + NormalGroup normalGroup = normalGroupMap.get(keys.parentKey); if (normalGroup != null) { removed = normalGroup.removeSlot(keys.childKey) || removed; @@ -159,14 +176,14 @@ boolean removeSlotFromGroup(Keys keys) { // // Returns true if any delayed slot is moved, false otherwise. boolean moveDelayedSlotToDelayedGroup( - NormalGroup normalGroup) { + NormalGroup normalGroup) { long stamp = lock.writeLock(); try { // TODO: NormalGroup.removeNotReadySlots() calls updateStatus() potentially resulting in // delegateEmitTaskToWaiter(). Maybe it should be called outside the lock. // Remove delayed tasks from the NormalGroup so that it can be ready. - List> notReadySlots = + List> notReadySlots = normalGroup.removeNotReadySlots(); if (notReadySlots == null) { normalGroup.updateDelayedSlotMoveTimeoutAt(); @@ -174,18 +191,19 @@ boolean moveDelayedSlotToDelayedGroup( "This group isn't needed to remove slots. Updated the timeout. Group: {}", normalGroup); return false; } - for (Slot notReadySlot : notReadySlots) { + for (Slot notReadySlot : + notReadySlots) { // Create a new DelayedGroup FULL_KEY fullKey = notReadySlot.fullKey(); - DelayedGroup delayedGroup = - new DelayedGroup<>(config, fullKey, emitter, keyManipulator); + DelayedGroup + delayedGroup = new DelayedGroup<>(config, fullKey, emitter, keyManipulator); // Set the slot stored in the NormalGroup into the new DelayedGroup. // Internally delegate the emit-task to the client thread. checkNotNull(delayedGroup.reserveNewSlot(notReadySlot)); // Register the new DelayedGroup to the map and cleanup queue. - DelayedGroup old = + DelayedGroup old = delayedGroupMap.put(fullKey, delayedGroup); if (old != null) { throw new AssertionError( @@ -205,7 +223,7 @@ boolean moveDelayedSlotToDelayedGroup( return true; } - void setEmitter(Emittable emitter) { + void setEmitter(Emittable emitter) { this.emitter = emitter; } diff --git a/core/src/main/java/com/scalar/db/util/groupcommit/GroupSizeFixWorker.java b/core/src/main/java/com/scalar/db/util/groupcommit/GroupSizeFixWorker.java index ca6d55f726..d5be5b5b0e 100644 --- a/core/src/main/java/com/scalar/db/util/groupcommit/GroupSizeFixWorker.java +++ b/core/src/main/java/com/scalar/db/util/groupcommit/GroupSizeFixWorker.java @@ -8,17 +8,23 @@ // DelayedSlotMoveWorker. // Ready NormalGroup is passed to GroupCleanupWorker. @ThreadSafe -class GroupSizeFixWorker - extends BackgroundWorker> { - private final DelayedSlotMoveWorker +class GroupSizeFixWorker + extends BackgroundWorker< + NormalGroup> { + private final DelayedSlotMoveWorker< + PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V> delayedSlotMoveWorker; - private final GroupCleanupWorker groupCleanupWorker; + private final GroupCleanupWorker< + PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V> + groupCleanupWorker; GroupSizeFixWorker( String label, long queueCheckIntervalInMillis, - DelayedSlotMoveWorker delayedSlotMoveWorker, - GroupCleanupWorker groupCleanupWorker) { + DelayedSlotMoveWorker + delayedSlotMoveWorker, + GroupCleanupWorker + groupCleanupWorker) { super( label + "-group-commit-normal-group-size-fix", queueCheckIntervalInMillis, @@ -28,7 +34,7 @@ class GroupSizeFixWorker } private void enqueueItemToNextQueue( - NormalGroup normalGroup) { + NormalGroup normalGroup) { if (normalGroup.isReady()) { groupCleanupWorker.add(normalGroup); } else { @@ -37,7 +43,8 @@ private void enqueueItemToNextQueue( } @Override - BlockingQueue> createQueue() { + BlockingQueue> + createQueue() { // Use a normal queue because: // - Queued groups are removed once processed, without being re-enqueued // - No need for a priority queue since the order of queued groups is basically consistent with @@ -46,7 +53,8 @@ BlockingQueue> createQ } @Override - boolean processItem(NormalGroup normalGroup) { + boolean processItem( + NormalGroup normalGroup) { // Size-fix the group if needed. if (normalGroup.isSizeFixed()) { enqueueItemToNextQueue(normalGroup); diff --git a/core/src/main/java/com/scalar/db/util/groupcommit/KeyManipulator.java b/core/src/main/java/com/scalar/db/util/groupcommit/KeyManipulator.java index 25a5a09cdd..6c8413c7a5 100644 --- a/core/src/main/java/com/scalar/db/util/groupcommit/KeyManipulator.java +++ b/core/src/main/java/com/scalar/db/util/groupcommit/KeyManipulator.java @@ -11,10 +11,11 @@ * @param A key type to slot in NormalGroup which can contain a value ready to commit. * @param A key type to DelayedGroup which contains a single slot and is * singly-committed. - * @param A key type that Emitter can interpret. + * @param A parent-key type that Emitter can interpret. + * @param A full-key type that Emitter can interpret. */ @Immutable -public interface KeyManipulator { +public interface KeyManipulator { class Keys { public final PARENT_KEY parentKey; public final CHILD_KEY childKey; @@ -44,7 +45,7 @@ public String toString() { Keys keysFromFullKey(FULL_KEY fullKey); - EMIT_KEY emitKeyFromFullKey(FULL_KEY fullKey); + EMIT_FULL_KEY emitFullKeyFromFullKey(FULL_KEY fullKey); - EMIT_KEY emitKeyFromParentKey(PARENT_KEY parentKey); + EMIT_PARENT_KEY emitParentKeyFromParentKey(PARENT_KEY parentKey); } diff --git a/core/src/main/java/com/scalar/db/util/groupcommit/NormalGroup.java b/core/src/main/java/com/scalar/db/util/groupcommit/NormalGroup.java index 758f283491..d91b52d90e 100644 --- a/core/src/main/java/com/scalar/db/util/groupcommit/NormalGroup.java +++ b/core/src/main/java/com/scalar/db/util/groupcommit/NormalGroup.java @@ -15,8 +15,8 @@ // A group for multiple slots that will be group-committed at once. @ThreadSafe -class NormalGroup - extends Group { +class NormalGroup + extends Group { private static final Logger logger = LoggerFactory.getLogger(NormalGroup.class); private final PARENT_KEY parentKey; @@ -26,8 +26,9 @@ class NormalGroup NormalGroup( GroupCommitConfig config, - Emittable emitter, - KeyManipulator keyManipulator) { + Emittable emitter, + KeyManipulator + keyManipulator) { super(emitter, keyManipulator, config.slotCapacity(), config.oldGroupAbortTimeoutMillis()); this.delayedSlotMoveTimeoutMillis = config.delayedSlotMoveTimeoutMillis(); this.groupSizeFixTimeoutAtMillis = @@ -52,7 +53,8 @@ FULL_KEY reserveNewSlot(CHILD_KEY childKey) { } @Nullable - synchronized List> removeNotReadySlots() { + synchronized List> + removeNotReadySlots() { if (!isSizeFixed()) { logger.info( "No need to remove any slot since the size isn't fixed yet. Too early. Group: {}", this); @@ -61,10 +63,12 @@ synchronized List> removeNotR // Lazy instantiation might be better, but it's likely there is a not-ready value slot since // it's already timed-out. - List> removed = new ArrayList<>(); - for (Entry> entry : - slots.entrySet()) { - Slot slot = entry.getValue(); + List> removed = + new ArrayList<>(); + for (Entry> + entry : slots.entrySet()) { + Slot slot = + entry.getValue(); if (!slot.isReady()) { removed.add(slot); } @@ -78,7 +82,7 @@ synchronized List> removeNotR return null; } - for (Slot slot : removed) { + for (Slot slot : removed) { removeSlot(slot.key()); logger.debug( "Removed a value slot from group to move it to delayed group. Group: {}, Slot: {}", @@ -90,12 +94,13 @@ synchronized List> removeNotR @Override public synchronized void delegateEmitTaskToWaiter() { - final AtomicReference> emitterSlot = - new AtomicReference<>(); + final AtomicReference> + emitterSlot = new AtomicReference<>(); boolean isFirst = true; List values = new ArrayList<>(slots.size()); - for (Slot slot : slots.values()) { + for (Slot slot : + slots.values()) { // Use the first slot as an emitter. if (isFirst) { isFirst = false; @@ -113,12 +118,13 @@ public synchronized void delegateEmitTaskToWaiter() { logger.info("This group is already done, but trying to emit. Group: {}", this); return; } - emitter.execute(keyManipulator.emitKeyFromParentKey(parentKey), values); + emitter.emitNormalGroup(keyManipulator.emitParentKeyFromParentKey(parentKey), values); synchronized (this) { // Wake up the other waiting threads. // Pass null since the value is already emitted by the thread of `firstSlot`. - for (Slot slot : slots.values()) { + for (Slot slot : + slots.values()) { if (slot != emitterSlot.get()) { slot.markAsSuccess(); } @@ -130,7 +136,8 @@ public synchronized void delegateEmitTaskToWaiter() { // Let other threads know the exception. synchronized (this) { - for (Slot slot : slots.values()) { + for (Slot slot : + slots.values()) { if (slot != emitterSlot.get()) { slot.markAsFailed(exception); } @@ -161,7 +168,7 @@ long delayedSlotMoveTimeoutAtMillis() { public boolean equals(Object o) { if (this == o) return true; if (!(o instanceof NormalGroup)) return false; - NormalGroup that = (NormalGroup) o; + NormalGroup that = (NormalGroup) o; return Objects.equal(parentKey, that.parentKey); } diff --git a/core/src/main/java/com/scalar/db/util/groupcommit/Slot.java b/core/src/main/java/com/scalar/db/util/groupcommit/Slot.java index db306b9c85..12f01886f3 100644 --- a/core/src/main/java/com/scalar/db/util/groupcommit/Slot.java +++ b/core/src/main/java/com/scalar/db/util/groupcommit/Slot.java @@ -12,9 +12,10 @@ // A container of value which is stored in a group. @ThreadSafe -class Slot { - private final AtomicReference> parentGroup = - new AtomicReference<>(); +class Slot { + private final AtomicReference< + Group> + parentGroup = new AtomicReference<>(); private final CHILD_KEY key; // If a result value is null, the value is already emitted. // Otherwise, the result lambda must be emitted by the receiver's thread. @@ -29,14 +30,17 @@ class Slot { // This value changes only from false to true. private final AtomicBoolean isDone = new AtomicBoolean(); - Slot(CHILD_KEY key, NormalGroup parentGroup) { + Slot( + CHILD_KEY key, + NormalGroup parentGroup) { this.key = key; this.parentGroup.set(parentGroup); } // This is called only once when being moved from NormalGroup to DelayedGroup. void changeParentGroupToDelayedGroup( - DelayedGroup parentGroup) { + DelayedGroup + parentGroup) { this.parentGroup.set(parentGroup); } diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/CoordinatorGroupCommitterTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/CoordinatorGroupCommitterTest.java index e447b67d1b..14318c1c46 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/CoordinatorGroupCommitterTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/CoordinatorGroupCommitterTest.java @@ -1,12 +1,10 @@ package com.scalar.db.transaction.consensuscommit; import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.anyList; -import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import com.scalar.db.transaction.consensuscommit.CoordinatorGroupCommitter.CoordinatorGroupCommitKeyManipulator; @@ -33,8 +31,9 @@ class CoordinatorGroupCommitterTest { private final CoordinatorGroupCommitKeyManipulator keyManipulator = new CoordinatorGroupCommitKeyManipulator(); - @Mock private Emittable emitter; + @Mock private Emittable emitter; @Captor private ArgumentCaptor> snapshotsArgumentCaptor; + @Captor private ArgumentCaptor snapshotArgumentCaptor; @Test void reserve_GivenArbitraryChildTxId_ShouldReturnFullTxId() throws Exception { @@ -68,7 +67,13 @@ void reserve_GivenArbitraryChildTxId_ShouldReturnFullTxId() throws Exception { .isEqualTo(keyManipulator.keysFromFullKey(fullTxId4).parentKey); assertThat(keyManipulator.keysFromFullKey(fullTxId1).parentKey) .isNotEqualTo(keyManipulator.keysFromFullKey(fullTxId3).parentKey); - verify(emitter, never()).execute(anyString(), anyList()); + verify(emitter, never()).emitNormalGroup(any(), any()); + verify(emitter, never()).emitDelayedGroup(any(), any()); + + groupCommitter.remove(fullTxId1); + groupCommitter.remove(fullTxId2); + groupCommitter.remove(fullTxId3); + groupCommitter.remove(fullTxId4); } } @@ -126,17 +131,17 @@ void ready_GivenArbitrarySnapshot_ShouldWaitUntilGroupCommitted() throws Excepti for (Future future : futures) { future.get(10, TimeUnit.SECONDS); } - verify(emitter, times(2)).execute(anyString(), anyList()); verify(emitter) - .execute( + .emitNormalGroup( eq(keyManipulator.keysFromFullKey(fullTxId1).parentKey), snapshotsArgumentCaptor.capture()); assertThat(snapshotsArgumentCaptor.getValue()).containsOnly(snapshot1, snapshot2); verify(emitter) - .execute( + .emitNormalGroup( eq(keyManipulator.keysFromFullKey(fullTxId3).parentKey), snapshotsArgumentCaptor.capture()); assertThat(snapshotsArgumentCaptor.getValue()).containsOnly(snapshot3, snapshot4); + verify(emitter, never()).emitDelayedGroup(any(), any()); } } @@ -179,14 +184,13 @@ void ready_GivenArbitrarySnapshotWithSomeDelay_ShouldWaitUntilSeparatelyGroupCom for (Future future : futures) { future.get(10, TimeUnit.SECONDS); } - verify(emitter, times(2)).execute(anyString(), anyList()); verify(emitter) - .execute( + .emitNormalGroup( eq(keyManipulator.keysFromFullKey(fullTxId1).parentKey), snapshotsArgumentCaptor.capture()); assertThat(snapshotsArgumentCaptor.getValue()).containsOnly(snapshot1); - verify(emitter).execute(eq(fullTxId2), snapshotsArgumentCaptor.capture()); - assertThat(snapshotsArgumentCaptor.getValue()).containsOnly(snapshot2); + verify(emitter).emitDelayedGroup(eq(fullTxId2), snapshotArgumentCaptor.capture()); + assertThat(snapshotArgumentCaptor.getValue()).isEqualTo(snapshot2); } } @@ -222,12 +226,12 @@ void remove_GivenOneOfFullTxIds_ShouldRemoveItAndProceedTheOther() throws Except // Short timeout is enough since there is no delayed transaction. future.get(1, TimeUnit.SECONDS); } - verify(emitter, times(1)).execute(anyString(), anyList()); verify(emitter) - .execute( + .emitNormalGroup( eq(keyManipulator.keysFromFullKey(fullTxId2).parentKey), snapshotsArgumentCaptor.capture()); assertThat(snapshotsArgumentCaptor.getValue()).containsOnly(snapshot); + verify(emitter, never()).emitDelayedGroup(any(), any()); } } @@ -249,7 +253,8 @@ void remove_GivenAllFullTxIds_ShouldRemoveAll() throws Exception { TimeUnit.MILLISECONDS.sleep(1000); // Assert - verify(emitter, never()).execute(anyString(), anyList()); + verify(emitter, never()).emitNormalGroup(any(), any()); + verify(emitter, never()).emitDelayedGroup(any(), any()); } } diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/CoordinatorTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/CoordinatorTest.java index 563048631a..98251e20fa 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/CoordinatorTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/CoordinatorTest.java @@ -583,7 +583,7 @@ public void putStateForGroupCommit_ParentIdGiven_ShouldPutWithCorrectValues( @EnumSource( value = TransactionState.class, names = {"COMMITTED", "ABORTED"}) - public void putStateForGroupCommit_FullIdGiven_ShouldPutWithCorrectValuesWithEmptyChildIds( + public void putStateForGroupCommit_FullIdGiven_ShouldThrowAssertionError( TransactionState transactionState) throws ExecutionException, CoordinatorException { // Arrange Coordinator spiedCoordinator = spy(coordinator); @@ -597,15 +597,11 @@ public void putStateForGroupCommit_FullIdGiven_ShouldPutWithCorrectValuesWithEmp doNothing().when(storage).put(any(Put.class)); // Act - spiedCoordinator.putStateForGroupCommit(fullId, fullIds, transactionState, current); - // Assert - - // With a full ID as `tx_id`, it's basically same as normal commits and `tx_child_ids` value can - // be empty. - verify(spiedCoordinator) - .createPutWith( - new Coordinator.State(fullId, Collections.emptyList(), transactionState, current)); + assertThatThrownBy( + () -> + spiedCoordinator.putStateForGroupCommit(fullId, fullIds, transactionState, current)) + .isInstanceOf(AssertionError.class); } @ParameterizedTest diff --git a/core/src/test/java/com/scalar/db/util/groupcommit/DelayedGroupTest.java b/core/src/test/java/com/scalar/db/util/groupcommit/DelayedGroupTest.java index a331eaac0f..aac2ab2f39 100644 --- a/core/src/test/java/com/scalar/db/util/groupcommit/DelayedGroupTest.java +++ b/core/src/test/java/com/scalar/db/util/groupcommit/DelayedGroupTest.java @@ -7,6 +7,7 @@ import static org.mockito.Mockito.verify; import com.google.common.util.concurrent.Uninterruptibles; +import com.scalar.db.util.ThrowableRunnable; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -18,14 +19,17 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +@ExtendWith(MockitoExtension.class) class DelayedGroupTest { - private Emittable emitter; + @Mock private Emittable emitter; private TestableKeyManipulator keyManipulator; @BeforeEach void setUp() { - emitter = (s, values) -> {}; // This generates parent keys which start with "0000" and increment by one for each subsequent // key ("0001", "0002"...). keyManipulator = new TestableKeyManipulator(); @@ -35,7 +39,7 @@ void setUp() { void fullKey_GivenFullKeyViaConstructor_ShouldReturnProperly() { // Arrange GroupCommitConfig config = new GroupCommitConfig(2, 100, 1000, 60000, 20); - DelayedGroup group = + DelayedGroup group = new DelayedGroup<>(config, "0000:full-key", emitter, keyManipulator); // Act @@ -47,10 +51,11 @@ void fullKey_GivenFullKeyViaConstructor_ShouldReturnProperly() { void reserveNewSlot_GivenArbitrarySlot_ShouldStoreIt() { // Arrange GroupCommitConfig config = new GroupCommitConfig(2, 100, 1000, 60000, 20); - NormalGroup oldGroup = + NormalGroup oldGroup = new NormalGroup<>(config, emitter, keyManipulator); - Slot slot = spy(new Slot<>("child-key", oldGroup)); - DelayedGroup group = + Slot slot = + spy(new Slot<>("child-key", oldGroup)); + DelayedGroup group = new DelayedGroup<>(config, "0000:full-key", emitter, keyManipulator); assertThat(group.size()).isNull(); @@ -70,10 +75,10 @@ void reserveNewSlot_GivenArbitrarySlot_ShouldStoreIt() { void reserveNewSlot_GivenAlreadyReservedSlot_ShouldThrowException() { // Arrange GroupCommitConfig config = new GroupCommitConfig(2, 100, 1000, 60000, 20); - NormalGroup oldGroup = + NormalGroup oldGroup = new NormalGroup<>(config, emitter, keyManipulator); - Slot slot = new Slot<>("child-key", oldGroup); - DelayedGroup group = + Slot slot = new Slot<>("child-key", oldGroup); + DelayedGroup group = new DelayedGroup<>(config, "0000:full-key", emitter, keyManipulator); group.reserveNewSlot(slot); @@ -83,6 +88,24 @@ void reserveNewSlot_GivenAlreadyReservedSlot_ShouldThrowException() { assertThat(group.isReady()).isFalse(); } + private Emittable createEmitter(ThrowableRunnable task) { + return new Emittable() { + @Override + public void emitNormalGroup(String parentKey, List values) { + throw new AssertionError(); + } + + @Override + public void emitDelayedGroup(String fullKey, Integer value) { + try { + task.run(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }; + } + @Test void putValueToSlotAndWait_WithSuccessfulEmitTask_ShouldExecuteTaskProperly() throws InterruptedException, ExecutionException { @@ -90,15 +113,17 @@ void putValueToSlotAndWait_WithSuccessfulEmitTask_ShouldExecuteTaskProperly() GroupCommitConfig config = new GroupCommitConfig(2, 100, 1000, 60000, 20); AtomicBoolean emitted = new AtomicBoolean(); CountDownLatch wait = new CountDownLatch(1); - Emittable waitableEmitter = - (s, values) -> { - wait.await(); - emitted.set(true); - }; - NormalGroup oldGroup = + Emittable waitableEmitter = + createEmitter( + () -> { + wait.await(); + emitted.set(true); + }); + + NormalGroup oldGroup = new NormalGroup<>(config, waitableEmitter, keyManipulator); - Slot slot = new Slot<>("child-key", oldGroup); - DelayedGroup group = + Slot slot = new Slot<>("child-key", oldGroup); + DelayedGroup group = new DelayedGroup<>(config, "0000:full-key", waitableEmitter, keyManipulator); ExecutorService executorService = Executors.newCachedThreadPool(); @@ -140,15 +165,16 @@ void putValueToSlotAndWait_WithFailingEmitTask_ShouldFail() { // Arrange GroupCommitConfig config = new GroupCommitConfig(2, 100, 1000, 60000, 20); CountDownLatch wait = new CountDownLatch(1); - Emittable failingEmitter = - (s, values) -> { - wait.await(); - throw new RuntimeException("Something is wrong"); - }; - NormalGroup oldGroup = + Emittable failingEmitter = + createEmitter( + () -> { + wait.await(); + throw new RuntimeException("Something is wrong"); + }); + NormalGroup oldGroup = new NormalGroup<>(config, failingEmitter, keyManipulator); - Slot slot = new Slot<>("child-key", oldGroup); - DelayedGroup group = + Slot slot = new Slot<>("child-key", oldGroup); + DelayedGroup group = new DelayedGroup<>(config, "0000:full-key", failingEmitter, keyManipulator); ExecutorService executorService = Executors.newCachedThreadPool(); @@ -187,10 +213,10 @@ void putValueToSlotAndWait_WithFailingEmitTask_ShouldFail() { void removeSlot_GivenNoReadySlot_ShouldRemoveSlotAndGetDone() { // Arrange GroupCommitConfig config = new GroupCommitConfig(2, 100, 1000, 60000, 20); - NormalGroup oldGroup = + NormalGroup oldGroup = new NormalGroup<>(config, emitter, keyManipulator); - Slot slot = new Slot<>("child-key", oldGroup); - DelayedGroup group = + Slot slot = new Slot<>("child-key", oldGroup); + DelayedGroup group = new DelayedGroup<>(config, "0000:full-key", emitter, keyManipulator); group.reserveNewSlot(slot); @@ -213,15 +239,16 @@ void removeSlot_GivenReadySlot_ShouldDoNothing() throws InterruptedException, Ex GroupCommitConfig config = new GroupCommitConfig(2, 100, 1000, 60000, 20); AtomicBoolean emitted = new AtomicBoolean(); CountDownLatch wait = new CountDownLatch(1); - Emittable waitableEmitter = - (s, values) -> { - wait.await(); - emitted.set(true); - }; - NormalGroup oldGroup = + Emittable waitableEmitter = + createEmitter( + () -> { + wait.await(); + emitted.set(true); + }); + NormalGroup oldGroup = new NormalGroup<>(config, waitableEmitter, keyManipulator); - Slot slot = new Slot<>("child-key", oldGroup); - DelayedGroup group = + Slot slot = new Slot<>("child-key", oldGroup); + DelayedGroup group = new DelayedGroup<>(config, "0000:full-key", waitableEmitter, keyManipulator); ExecutorService executorService = Executors.newCachedThreadPool(); @@ -263,7 +290,7 @@ void oldGroupAbortTimeoutAtMillis_GivenArbitraryTimeoutValue_ShouldReturnProperl // Arrange GroupCommitConfig config = new GroupCommitConfig(2, 100, 1000, 60000, 20); long minOfCurrentTimeMillis = System.currentTimeMillis(); - DelayedGroup group = + DelayedGroup group = new DelayedGroup<>(config, "0000:full-key", emitter, keyManipulator); long maxOfCurrentTimeMillis = System.currentTimeMillis(); @@ -278,10 +305,11 @@ void oldGroupAbortTimeoutAtMillis_GivenArbitraryTimeoutValue_ShouldReturnProperl void abort_ShouldAbortSlot() { // Arrange GroupCommitConfig config = new GroupCommitConfig(2, 100, 1000, 60000, 20); - NormalGroup oldGroup = + NormalGroup oldGroup = new NormalGroup<>(config, emitter, keyManipulator); - Slot slot = spy(new Slot<>("child-key", oldGroup)); - DelayedGroup group = + Slot slot = + spy(new Slot<>("child-key", oldGroup)); + DelayedGroup group = new DelayedGroup<>(config, "0000:full-key", emitter, keyManipulator); group.reserveNewSlot(slot); diff --git a/core/src/test/java/com/scalar/db/util/groupcommit/DelayedSlotMoveWorkerTest.java b/core/src/test/java/com/scalar/db/util/groupcommit/DelayedSlotMoveWorkerTest.java index 19b05fedfd..e98d9d9696 100644 --- a/core/src/test/java/com/scalar/db/util/groupcommit/DelayedSlotMoveWorkerTest.java +++ b/core/src/test/java/com/scalar/db/util/groupcommit/DelayedSlotMoveWorkerTest.java @@ -23,14 +23,17 @@ @ExtendWith(MockitoExtension.class) class DelayedSlotMoveWorkerTest { private static final int LONG_WAIT_MILLIS = 500; - @Mock private GroupCleanupWorker groupCleanupWorker; - @Mock private NormalGroup normalGroup1; - @Mock private NormalGroup normalGroup2; - @Mock private NormalGroup normalGroup3; - @Mock private NormalGroup normalGroup4; - @Mock private GroupManager groupManager; - private DelayedSlotMoveWorker worker; - private DelayedSlotMoveWorker workerWithWait; + + @Mock + private GroupCleanupWorker groupCleanupWorker; + + @Mock private NormalGroup normalGroup1; + @Mock private NormalGroup normalGroup2; + @Mock private NormalGroup normalGroup3; + @Mock private NormalGroup normalGroup4; + @Mock private GroupManager groupManager; + private DelayedSlotMoveWorker worker; + private DelayedSlotMoveWorker workerWithWait; @BeforeEach void setUp() { @@ -47,7 +50,7 @@ void tearDown() { } private void doReturnOldGroupAbortTimeoutAtMillis( - NormalGroup normalGroup) { + NormalGroup normalGroup) { long oldGroupAbortMillis = System.currentTimeMillis() + 60 * 1000; doReturn(oldGroupAbortMillis).when(normalGroup).oldGroupAbortTimeoutAtMillis(); } diff --git a/core/src/test/java/com/scalar/db/util/groupcommit/GroupCleanupWorkerTest.java b/core/src/test/java/com/scalar/db/util/groupcommit/GroupCleanupWorkerTest.java index 001272208e..234702ff6c 100644 --- a/core/src/test/java/com/scalar/db/util/groupcommit/GroupCleanupWorkerTest.java +++ b/core/src/test/java/com/scalar/db/util/groupcommit/GroupCleanupWorkerTest.java @@ -21,13 +21,13 @@ @ExtendWith(MockitoExtension.class) class GroupCleanupWorkerTest { private static final int LONG_WAIT_MILLIS = 500; - @Mock private NormalGroup normalGroup1; - @Mock private NormalGroup normalGroup2; - @Mock private DelayedGroup delayedGroup1; - @Mock private DelayedGroup delayedGroup2; - @Mock private GroupManager groupManager; - private GroupCleanupWorker worker; - private GroupCleanupWorker workerWithWait; + @Mock private NormalGroup normalGroup1; + @Mock private NormalGroup normalGroup2; + @Mock private DelayedGroup delayedGroup1; + @Mock private DelayedGroup delayedGroup2; + @Mock private GroupManager groupManager; + private GroupCleanupWorker worker; + private GroupCleanupWorker workerWithWait; @BeforeEach void setUp() { @@ -42,7 +42,7 @@ void tearDown() { } private void doReturnOldGroupAbortTimeoutAtMillis( - Group group) { + Group group) { long oldGroupAbortMillis = System.currentTimeMillis() + 60 * 1000; doReturn(oldGroupAbortMillis).when(group).oldGroupAbortTimeoutAtMillis(); } diff --git a/core/src/test/java/com/scalar/db/util/groupcommit/GroupCommitterConcurrentTest.java b/core/src/test/java/com/scalar/db/util/groupcommit/GroupCommitterConcurrentTest.java index b9dc39d3c6..cd1e2c2a84 100644 --- a/core/src/test/java/com/scalar/db/util/groupcommit/GroupCommitterConcurrentTest.java +++ b/core/src/test/java/com/scalar/db/util/groupcommit/GroupCommitterConcurrentTest.java @@ -6,6 +6,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.Uninterruptibles; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Random; @@ -28,7 +29,8 @@ public ExpectedException(String message) { } } - private static class MyKeyManipulator implements KeyManipulator { + private static class MyKeyManipulator + implements KeyManipulator { @Override public String generateParentKey() { return UUID.randomUUID().toString(); @@ -55,12 +57,12 @@ public Keys keysFromFullKey(String fullKey) { } @Override - public String emitKeyFromFullKey(String s) { + public String emitFullKeyFromFullKey(String s) { return s; } @Override - public String emitKeyFromParentKey(String s) { + public String emitParentKeyFromParentKey(String s) { return s; } } @@ -150,32 +152,44 @@ private static class Runner { } // Returns a lambda that will be executed once the group is ready to group-commit. - private Emittable emitter() { - return (parentKey, values) -> { - if (maxEmitDurationInMillis > 0) { - int waitInMillis = rand.nextInt(maxEmitDurationInMillis); - Uninterruptibles.sleepUninterruptibly(waitInMillis, TimeUnit.MILLISECONDS); - } - if (errorAfterReadyPercentage > rand.nextInt(100)) { + private Emittable createEmitter() { + return new Emittable() { + private void emit(String ignored, List values) { + if (maxEmitDurationInMillis > 0) { + int waitInMillis = rand.nextInt(maxEmitDurationInMillis); + Uninterruptibles.sleepUninterruptibly(waitInMillis, TimeUnit.MILLISECONDS); + } + if (errorAfterReadyPercentage > rand.nextInt(100)) { + for (Value v : values) { + // Remember the value as a failure. + if (failedKeys.put(v.v, true) != null) { + throw new RuntimeException(v + " is already set"); + } + } + throw new ExpectedException("Error after READY"); + } for (Value v : values) { - // Remember the value as a failure. - if (failedKeys.put(v.v, true) != null) { + // Remember the value as a success. + if (emittedKeys.put(v.v, true) != null) { throw new RuntimeException(v + " is already set"); } } - throw new ExpectedException("Error after READY"); } - for (Value v : values) { - // Remember the value as a success. - if (emittedKeys.put(v.v, true) != null) { - throw new RuntimeException(v + " is already set"); - } + + @Override + public void emitNormalGroup(String parentKey, List values) throws Exception { + emit(parentKey, values); + } + + @Override + public void emitDelayedGroup(String fullKey, Value value) throws Exception { + emit(fullKey, Collections.singletonList(value)); } }; } private Callable groupCommitterCaller( - GroupCommitter groupCommitter, + GroupCommitter groupCommitter, String childKey, Value value) { return () -> { @@ -217,9 +231,9 @@ private Callable groupCommitterCaller( private void exec(GroupCommitConfig groupCommitConfig) throws ExecutionException, InterruptedException, TimeoutException { - try (GroupCommitter groupCommitter = + try (GroupCommitter groupCommitter = new GroupCommitter<>("test", groupCommitConfig, new MyKeyManipulator())) { - groupCommitter.setEmitter(emitter()); + groupCommitter.setEmitter(createEmitter()); List futures = new ArrayList<>(); @@ -294,7 +308,7 @@ private void checkResults() { } private void checkGarbage( - GroupCommitter groupCommitter) { + GroupCommitter groupCommitter) { boolean noGarbage = false; GroupCommitMetrics groupCommitMetrics = null; for (int i = 0; i < 60; i++) { diff --git a/core/src/test/java/com/scalar/db/util/groupcommit/GroupCommitterTest.java b/core/src/test/java/com/scalar/db/util/groupcommit/GroupCommitterTest.java index 16cda37f93..74020a489a 100644 --- a/core/src/test/java/com/scalar/db/util/groupcommit/GroupCommitterTest.java +++ b/core/src/test/java/com/scalar/db/util/groupcommit/GroupCommitterTest.java @@ -3,7 +3,6 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.*; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; @@ -33,14 +32,19 @@ class GroupCommitterTest { private static final int OLD_GROUP_ABORT_TIMEOUT_MILLIS = 10000; private static final int TIMEOUT_CHECK_INTERVAL_MILLIS = 10; - @Mock private Emittable emitter; - @Mock private GroupManager groupManager; - @Mock private GroupSizeFixWorker groupSizeFixWorker; + @Mock private Emittable emitter; + @Mock private GroupManager groupManager; @Mock - private DelayedSlotMoveWorker delayedSlotMoveWorker; + private GroupSizeFixWorker groupSizeFixWorker; + + @Mock + private DelayedSlotMoveWorker + delayedSlotMoveWorker; + + @Mock + private GroupCleanupWorker groupCleanupWorker; - @Mock private GroupCleanupWorker groupCleanupWorker; @Mock private GroupCommitMonitor groupCommitMonitor; @Mock private GroupCommitMetrics groupCommitMetrics; @@ -53,46 +57,51 @@ class GroupCommitterTest { // Use only a single instance at most in a test case. private class TestableGroupCommitter - extends GroupCommitter { + extends GroupCommitter { TestableGroupCommitter( - GroupCommitConfig config, KeyManipulator keyManipulator) { + GroupCommitConfig config, + KeyManipulator keyManipulator) { super("test", config, keyManipulator); } @Override - GroupManager createGroupManager( - GroupCommitConfig config, KeyManipulator keyManipulator) { + GroupManager createGroupManager( + GroupCommitConfig config, + KeyManipulator keyManipulator) { testableGroupCommitterGroupManagerCreated.set(true); return groupManager; } @Override - GroupSizeFixWorker createGroupSizeFixWorker( + GroupSizeFixWorker createGroupSizeFixWorker( String label, GroupCommitConfig config, - GroupManager groupManager, - DelayedSlotMoveWorker delayedSlotMoveWorker, - GroupCleanupWorker groupCleanupWorker) { + GroupManager groupManager, + DelayedSlotMoveWorker + delayedSlotMoveWorker, + GroupCleanupWorker groupCleanupWorker) { testableGroupCommitterGroupSizeFixWorkerCreated.set(true); return groupSizeFixWorker; } @Override - DelayedSlotMoveWorker createDelayedSlotMoveWorker( - String label, - GroupCommitConfig config, - GroupManager groupManager, - GroupCleanupWorker groupCleanupWorker) { + DelayedSlotMoveWorker + createDelayedSlotMoveWorker( + String label, + GroupCommitConfig config, + GroupManager groupManager, + GroupCleanupWorker + groupCleanupWorker) { testableGroupCommitterDelayedSlotMoveWorkerCreated.set(true); return delayedSlotMoveWorker; } @Override - GroupCleanupWorker createGroupCleanupWorker( + GroupCleanupWorker createGroupCleanupWorker( String label, GroupCommitConfig config, - GroupManager groupManager) { + GroupManager groupManager) { testableGroupCommitterGroupCleanupWorkerCreated.set(true); return groupCleanupWorker; } @@ -118,7 +127,7 @@ void setUp() { testableGroupCommitterGroupCommitMonitorCreated.set(false); } - private GroupCommitter createGroupCommitter( + private GroupCommitter createGroupCommitter( int slotCapacity, int groupSizeFixTimeoutMillis, int delayedSlotMoveTimeoutMillis) { return new GroupCommitter<>( "test", @@ -166,25 +175,33 @@ void initialize_GivenMetricsMonitorLogDisalbled_ShouldStartAllWorkersExceptForMo @Test void reserve_GivenArbitraryChildKey_ShouldReturnFullKeyProperly() throws Exception { // Arrange - try (GroupCommitter groupCommitter = + try (GroupCommitter groupCommitter = createGroupCommitter(2, 100, 400)) { groupCommitter.setEmitter(emitter); // Act + String fullKey1 = groupCommitter.reserve("child-key-1"); + String fullKey2 = groupCommitter.reserve("child-key-2"); + String fullKey3 = groupCommitter.reserve("child-key-3"); + // Assert + assertThat(fullKey1).isEqualTo("0000:child-key-1"); + assertThat(fullKey2).isEqualTo("0000:child-key-2"); + assertThat(fullKey3).isEqualTo("0001:child-key-3"); - assertThat(groupCommitter.reserve("child-key-1")).isEqualTo("0000:child-key-1"); - assertThat(groupCommitter.reserve("child-key-2")).isEqualTo("0000:child-key-2"); - assertThat(groupCommitter.reserve("child-key-3")).isEqualTo("0001:child-key-3"); + verify(emitter, never()).emitNormalGroup(any(), any()); + verify(emitter, never()).emitDelayedGroup(any(), any()); - verify(emitter, never()).execute(any(), any()); + groupCommitter.remove(fullKey1); + groupCommitter.remove(fullKey2); + groupCommitter.remove(fullKey3); } } @Test void reserve_WhenAlreadyClosed_ShouldThrowException() { // Arrange - GroupCommitter groupCommitter = + GroupCommitter groupCommitter = createGroupCommitter(2, 100, 400); groupCommitter.setEmitter(emitter); groupCommitter.close(); @@ -198,7 +215,7 @@ void reserve_WhenAlreadyClosed_ShouldThrowException() { void ready_WhenTwoSlotsAreReadyInNormalGroup_WithSuccessfulEmitTask_ShouldEmitThem() throws Exception { // Arrange - try (GroupCommitter groupCommitter = + try (GroupCommitter groupCommitter = createGroupCommitter(2, 100, 400)) { groupCommitter.setEmitter(emitter); ExecutorService executorService = Executors.newCachedThreadPool(); @@ -206,7 +223,7 @@ void ready_WhenTwoSlotsAreReadyInNormalGroup_WithSuccessfulEmitTask_ShouldEmitTh // Reserve 3 slots. String fullKey1 = groupCommitter.reserve("child-key-1"); String fullKey2 = groupCommitter.reserve("child-key-2"); - groupCommitter.reserve("child-key-3"); + String fullKey3 = groupCommitter.reserve("child-key-3"); // There should be the following groups at this moment. // - NormalGroup("0000", SIZE-FIXED, slots:[Slot("child-key-1"), Slot("child-key-2")]) // - NormalGroup("0001", OPEN, slots:[Slot("child-key-3")]) @@ -227,25 +244,31 @@ void ready_WhenTwoSlotsAreReadyInNormalGroup_WithSuccessfulEmitTask_ShouldEmitTh // - NormalGroup("0001", OPEN, slots:[Slot("child-key-3")]) // Assert - verify(emitter).execute("0000", Arrays.asList(11, 22)); - verify(emitter, never()).execute(eq("0001"), any()); + verify(emitter).emitNormalGroup("0000", Arrays.asList(11, 22)); + verify(emitter, never()).emitDelayedGroup(any(), any()); + + groupCommitter.remove(fullKey3); } } @Test void ready_WhenTwoSlotsAreReadyInNormalGroup_WithFailingEmitTask_ShouldFail() throws Exception { // Arrange - Emittable failingEmitter = + Emittable failingEmitter = spy( - // This should be an anonymous class since `spy()` can't handle a lambda. - new Emittable() { + new Emittable() { + @Override + public void emitNormalGroup(String parentKey, List values) { + throw new RuntimeException("Something is wrong"); + } + @Override - public void execute(String key, List values) { + public void emitDelayedGroup(String fullKey, Integer value) throws Exception { throw new RuntimeException("Something is wrong"); } }); - try (GroupCommitter groupCommitter = + try (GroupCommitter groupCommitter = createGroupCommitter(2, 100, 400)) { groupCommitter.setEmitter(failingEmitter); ExecutorService executorService = Executors.newCachedThreadPool(); @@ -253,7 +276,7 @@ public void execute(String key, List values) { // Reserve 3 slots. String fullKey1 = groupCommitter.reserve("child-key-1"); String fullKey2 = groupCommitter.reserve("child-key-2"); - groupCommitter.reserve("child-key-3"); + String fullKey3 = groupCommitter.reserve("child-key-3"); // There should be the following groups at this moment. // - NormalGroup("0000", SIZE-FIXED, slots:[Slot("child-key-1"), Slot("child-key-2")]) // - NormalGroup("0001", OPEN, slots:[Slot("child-key-3")]) @@ -276,8 +299,10 @@ public void execute(String key, List values) { // - NormalGroup("0001", OPEN, slots:[Slot("child-key-3")]) // Assert - verify(failingEmitter).execute("0000", Arrays.asList(11, 22)); - verify(failingEmitter, never()).execute(eq("0001"), any()); + verify(failingEmitter).emitNormalGroup("0000", Arrays.asList(11, 22)); + verify(failingEmitter, never()).emitDelayedGroup(any(), any()); + + groupCommitter.remove(fullKey3); } } @@ -286,15 +311,15 @@ void ready_WhenOnlyOneOfTwoSlotsIsReadyInNormalGroup_ShouldJustWait() throws Exc // Arrange // `delayedSlotMoveTimeoutMillis` is enough long to wait `ready()`. - try (GroupCommitter groupCommitter = + try (GroupCommitter groupCommitter = createGroupCommitter(2, 100, 2000)) { groupCommitter.setEmitter(emitter); ExecutorService executorService = Executors.newCachedThreadPool(); // Reserve 3 slots. String fullKey1 = groupCommitter.reserve("child-key-1"); - groupCommitter.reserve("child-key-2"); - groupCommitter.reserve("child-key-3"); + String fullKey2 = groupCommitter.reserve("child-key-2"); + String fullKey3 = groupCommitter.reserve("child-key-3"); // There should be the following groups at this moment. // - NormalGroup("0000", SIZE-FIXED, slots:[Slot("child-key-1"), Slot("child-key-2")]) // - NormalGroup("0001", OPEN, slots:[Slot("child-key-3")]) @@ -314,7 +339,12 @@ void ready_WhenOnlyOneOfTwoSlotsIsReadyInNormalGroup_ShouldJustWait() throws Exc // yet. // So, this timeout must happen. assertThrows(TimeoutException.class, () -> future.get(1000, TimeUnit.MILLISECONDS)); - verify(emitter, never()).execute(any(), any()); + verify(emitter, never()).emitNormalGroup(any(), any()); + verify(emitter, never()).emitDelayedGroup(any(), any()); + + groupCommitter.remove(fullKey1); + groupCommitter.remove(fullKey2); + groupCommitter.remove(fullKey3); } } @@ -322,7 +352,7 @@ void ready_WhenOnlyOneOfTwoSlotsIsReadyInNormalGroup_ShouldJustWait() throws Exc void ready_WhenSlotIsReadyInDelayedGroup_WithSuccessfulEmitTask_ShouldEmitThem() throws Exception { // Arrange - try (GroupCommitter groupCommitter = + try (GroupCommitter groupCommitter = createGroupCommitter(2, 100, 400)) { groupCommitter.setEmitter(emitter); ExecutorService executorService = Executors.newCachedThreadPool(); @@ -342,8 +372,8 @@ void ready_WhenSlotIsReadyInDelayedGroup_WithSuccessfulEmitTask_ShouldEmitThem() // There should be the following groups at this moment. // - NormalGroup("0000", DONE, slots:[Slot(READY, "child-key-1")]) // - DelayedGroup("0000:child-key-2", SIZE-FIXED, slots:[Slot("child-key-2")]) - verify(emitter).execute("0000", Collections.singletonList(11)); - verify(emitter, never()).execute(eq("0000:child-key-2"), any()); + verify(emitter).emitNormalGroup("0000", Collections.singletonList(11)); + verify(emitter, never()).emitDelayedGroup(any(), any()); // Act @@ -359,24 +389,28 @@ void ready_WhenSlotIsReadyInDelayedGroup_WithSuccessfulEmitTask_ShouldEmitThem() // - DelayedGroup("0000:child-key-2", DONE, slots:[Slot("child-key-2")]) // Assert - verify(emitter).execute("0000:child-key-2", Collections.singletonList(22)); + verify(emitter).emitDelayedGroup("0000:child-key-2", 22); } } @Test void ready_WhenSlotIsReadyInDelayedGroup_WithFailingEmitTask_ShouldFail() throws Exception { // Arrange - Emittable failingEmitter = + Emittable failingEmitter = spy( - // This should be an anonymous class since `spy()` can't handle a lambda. - new Emittable() { + new Emittable() { @Override - public void execute(String key, List values) { + public void emitNormalGroup(String parentKey, List values) { + throw new RuntimeException("Something is wrong"); + } + + @Override + public void emitDelayedGroup(String fullKey, Integer value) throws Exception { throw new RuntimeException("Something is wrong"); } }); - try (GroupCommitter groupCommitter = + try (GroupCommitter groupCommitter = createGroupCommitter(2, 100, 400)) { groupCommitter.setEmitter(failingEmitter); ExecutorService executorService = Executors.newCachedThreadPool(); @@ -396,8 +430,8 @@ public void execute(String key, List values) { // There should be the following groups at this moment. // - NormalGroup("0000", DONE, slots:[Slot(READY, "child-key-1")]) // - DelayedGroup("0000:child-key-2", SIZE-FIXED, slots:[Slot("child-key-2")]) - verify(failingEmitter).execute("0000", Collections.singletonList(11)); - verify(failingEmitter, never()).execute(eq("0000:child-key-2"), any()); + verify(failingEmitter).emitNormalGroup("0000", Collections.singletonList(11)); + verify(failingEmitter, never()).emitDelayedGroup(any(), any()); // Act @@ -415,7 +449,7 @@ public void execute(String key, List values) { // - DelayedGroup("0000:child-key-2", DONE, slots:[Slot("child-key-2")]) // Assert - verify(failingEmitter).execute("0000:child-key-2", Collections.singletonList(22)); + verify(failingEmitter).emitDelayedGroup("0000:child-key-2", 22); } } @@ -423,7 +457,7 @@ public void execute(String key, List values) { void remove_GivenKeyForOpenGroup_ShouldRemoveIt() throws Exception { // Arrange // `slotCapacity` is 3 to prevent the group from being size-fixed. - try (GroupCommitter groupCommitter = + try (GroupCommitter groupCommitter = createGroupCommitter(3, 100, 400)) { groupCommitter.setEmitter(emitter); @@ -444,14 +478,15 @@ void remove_GivenKeyForOpenGroup_ShouldRemoveIt() throws Exception { // The slots are already removed and these operations must fail. assertThrows(GroupCommitConflictException.class, () -> groupCommitter.ready(fullKey1, 42)); assertThrows(GroupCommitConflictException.class, () -> groupCommitter.ready(fullKey2, 42)); - verify(emitter, never()).execute(any(), any()); + verify(emitter, never()).emitNormalGroup(any(), any()); + verify(emitter, never()).emitDelayedGroup(any(), any()); } } @Test void remove_GivenKeyForSizeFixedGroup_ShouldRemoveIt() throws Exception { // Arrange - try (GroupCommitter groupCommitter = + try (GroupCommitter groupCommitter = createGroupCommitter(2, 100, 400)) { groupCommitter.setEmitter(emitter); @@ -472,7 +507,8 @@ void remove_GivenKeyForSizeFixedGroup_ShouldRemoveIt() throws Exception { // The slots are already removed and these operations must fail. assertThrows(GroupCommitConflictException.class, () -> groupCommitter.ready(fullKey1, 42)); assertThrows(GroupCommitConflictException.class, () -> groupCommitter.ready(fullKey2, 42)); - verify(emitter, never()).execute(any(), any()); + verify(emitter, never()).emitNormalGroup(any(), any()); + verify(emitter, never()).emitDelayedGroup(any(), any()); } } @@ -480,17 +516,21 @@ void remove_GivenKeyForSizeFixedGroup_ShouldRemoveIt() throws Exception { void remove_GivenKeyForReadyGroup_ShouldFail() throws Exception { // Arrange CountDownLatch countDownLatch = new CountDownLatch(1); - Emittable testableEmitter = + Emittable testableEmitter = spy( - // This should be an anonymous class since `spy()` can't handle a lambda. - new Emittable() { + new Emittable() { @Override - public void execute(String key, List values) throws Exception { + public void emitNormalGroup(String parentKey, List values) throws Exception { countDownLatch.await(); } + + @Override + public void emitDelayedGroup(String fullKey, Integer value) { + throw new AssertionError(); + } }); - try (GroupCommitter groupCommitter = + try (GroupCommitter groupCommitter = createGroupCommitter(2, 100, 400)) { groupCommitter.setEmitter(testableEmitter); ExecutorService executorService = Executors.newCachedThreadPool(); @@ -527,7 +567,8 @@ public void execute(String key, List values) throws Exception { } // There should be the following groups at this moment. // - NormalGroup("0000", DONE, slots:[Slot("child-key-1"), Slot("child-key-2")]) - verify(testableEmitter).execute("0000", Arrays.asList(11, 22)); + verify(testableEmitter).emitNormalGroup("0000", Arrays.asList(11, 22)); + verify(testableEmitter, never()).emitDelayedGroup(any(), any()); } } diff --git a/core/src/test/java/com/scalar/db/util/groupcommit/GroupManagerTest.java b/core/src/test/java/com/scalar/db/util/groupcommit/GroupManagerTest.java index b635167ba7..7eebad97a9 100644 --- a/core/src/test/java/com/scalar/db/util/groupcommit/GroupManagerTest.java +++ b/core/src/test/java/com/scalar/db/util/groupcommit/GroupManagerTest.java @@ -21,9 +21,13 @@ class GroupManagerTest { private static final int TIMEOUT_CHECK_INTERVAL_MILLIS = 10; private TestableKeyManipulator keyManipulator; - @Mock private Emittable emittable; - @Mock private GroupSizeFixWorker groupSizeFixWorker; - @Mock private GroupCleanupWorker groupCleanupWorker; + @Mock private Emittable emittable; + + @Mock + private GroupSizeFixWorker groupSizeFixWorker; + + @Mock + private GroupCleanupWorker groupCleanupWorker; @BeforeEach void setUp() { @@ -37,7 +41,7 @@ void tearDown() {} void reserveNewSlot_WhenCurrentGroupDoesNotExist_ShouldCreateNewGroupAndReserveSlotInIt() { // Arrange - GroupManager groupManager = + GroupManager groupManager = new GroupManager<>( new GroupCommitConfig(2, 100, 400, 60000, TIMEOUT_CHECK_INTERVAL_MILLIS), keyManipulator); @@ -61,7 +65,7 @@ void reserveNewSlot_WhenCurrentGroupDoesNotExist_ShouldCreateNewGroupAndReserveS void reserveNewSlot_WhenAvailableSlotsExistInCurrentGroup_ShouldReserveSlotInCurrentGroup() { // Arrange - GroupManager groupManager = + GroupManager groupManager = new GroupManager<>( new GroupCommitConfig(2, 100, 400, 60000, TIMEOUT_CHECK_INTERVAL_MILLIS), keyManipulator); @@ -89,7 +93,7 @@ void reserveNewSlot_WhenAvailableSlotsExistInCurrentGroup_ShouldReserveSlotInCur void reserveNewSlot_WhenCurrentGroupIsSizeFixed_ShouldCreateNewGroupAndReserveSlotInIt() { // Arrange - GroupManager groupManager = + GroupManager groupManager = new GroupManager<>( new GroupCommitConfig(2, 100, 400, 60000, TIMEOUT_CHECK_INTERVAL_MILLIS), keyManipulator); @@ -123,7 +127,7 @@ void reserveNewSlot_WhenCurrentGroupIsSizeFixed_ShouldCreateNewGroupAndReserveSl void getGroup_GivenKeyForNormalGroup_ShouldReturnProperly() { // Arrange - GroupManager groupManager = + GroupManager groupManager = new GroupManager<>( new GroupCommitConfig(2, 100, 400, 60000, TIMEOUT_CHECK_INTERVAL_MILLIS), keyManipulator); @@ -145,18 +149,21 @@ void getGroup_GivenKeyForNormalGroup_ShouldReturnProperly() { // - NormalGroup("0001", OPEN, slots:[Slot("child-key-3")]) // Act - Group groupForKeys1 = groupManager.getGroup(keys1); - Group groupForKeys2 = groupManager.getGroup(keys2); - Group groupForKeys3 = groupManager.getGroup(keys3); + Group groupForKeys1 = + groupManager.getGroup(keys1); + Group groupForKeys2 = + groupManager.getGroup(keys2); + Group groupForKeys3 = + groupManager.getGroup(keys3); // Assert assertThat(groupForKeys1).isInstanceOf(NormalGroup.class); - NormalGroup normalGroupForKey1 = - (NormalGroup) groupForKeys1; + NormalGroup normalGroupForKey1 = + (NormalGroup) groupForKeys1; assertThat(groupForKeys3).isInstanceOf(NormalGroup.class); - NormalGroup normalGroupForKey3 = - (NormalGroup) groupForKeys3; + NormalGroup normalGroupForKey3 = + (NormalGroup) groupForKeys3; // The first 2 NormalGroups are the same since the capacity is 2. assertThat(normalGroupForKey1).isEqualTo(groupForKeys2); @@ -175,7 +182,7 @@ void getGroup_GivenKeyForDelayedGroup_ShouldReturnProperly() throws ExecutionException, InterruptedException { // Arrange - GroupManager groupManager = + GroupManager groupManager = new GroupManager<>( new GroupCommitConfig(2, 100, 400, 60000, TIMEOUT_CHECK_INTERVAL_MILLIS), keyManipulator); @@ -194,8 +201,8 @@ void getGroup_GivenKeyForDelayedGroup_ShouldReturnProperly() // These groups are supposed to exist at this moment. // - NormalGroup("0000", SIZE-FIXED, slots:[Slot("child-key-1"), Slot("child-key-2")]) - NormalGroup normalGroupForKey1 = - (NormalGroup) groupManager.getGroup(keys1); + NormalGroup normalGroupForKey1 = + (NormalGroup) groupManager.getGroup(keys1); // Put a value to the slot to mark it as ready. Future future = @@ -220,8 +227,9 @@ void getGroup_GivenKeyForDelayedGroup_ShouldReturnProperly() assertThat(normalGroupForKey1.slots.size()).isEqualTo(1); assertThat(normalGroupForKey1.isDone()).isTrue(); - DelayedGroup delayedGroupForKey2 = - (DelayedGroup) groupManager.getGroup(keys2); + DelayedGroup delayedGroupForKey2 = + (DelayedGroup) + groupManager.getGroup(keys2); assertThat(delayedGroupForKey2.isSizeFixed()).isTrue(); assertThat(delayedGroupForKey2.isReady()).isFalse(); } @@ -230,7 +238,7 @@ void getGroup_GivenKeyForDelayedGroup_ShouldReturnProperly() void removeGroupFromMap_GivenKeyForNormalGroup_ShouldRemoveThemProperly() { // Arrange - GroupManager groupManager = + GroupManager groupManager = new GroupManager<>( new GroupCommitConfig(2, 100, 400, 60000, TIMEOUT_CHECK_INTERVAL_MILLIS), keyManipulator); @@ -263,7 +271,7 @@ void removeGroupFromMap_GivenKeyForDelayedGroup_ShouldRemoveThemProperly() throws ExecutionException, InterruptedException { // Arrange - GroupManager groupManager = + GroupManager groupManager = new GroupManager<>( new GroupCommitConfig(2, 100, 400, 60000, TIMEOUT_CHECK_INTERVAL_MILLIS), keyManipulator); @@ -282,8 +290,8 @@ void removeGroupFromMap_GivenKeyForDelayedGroup_ShouldRemoveThemProperly() // These groups are supposed to exist at this moment. // - NormalGroup("0000", SIZE-FIXED, slots:[Slot("child-key-1"), Slot("child-key-2")]) - NormalGroup normalGroupForKey1 = - (NormalGroup) groupManager.getGroup(keys1); + NormalGroup normalGroupForKey1 = + (NormalGroup) groupManager.getGroup(keys1); // Put a value to the slot to mark the slot as ready. Future future = @@ -299,7 +307,8 @@ void removeGroupFromMap_GivenKeyForDelayedGroup_ShouldRemoveThemProperly() // - NormalGroup("0000", READY, slots:[Slot(READY, "child-key-1")]) // - DelayedGroup("0000:child-key-2", SIZE-FIXED, slots:[Slot("child-key-2")]) - Group groupForKey2 = groupManager.getGroup(keys2); + Group groupForKey2 = + groupManager.getGroup(keys2); assertThat(groupForKey2).isInstanceOf(DelayedGroup.class); assertThat(groupForKey2.isSizeFixed()).isTrue(); assertThat(groupForKey2.isReady()).isFalse(); @@ -324,7 +333,7 @@ void removeGroupFromMap_GivenKeyForDelayedGroup_ShouldRemoveThemProperly() void removeSlotFromGroup_GivenKeyForSlotInNormalGroup_ShouldRemoveSlotFromThemProperly() { // Arrange - GroupManager groupManager = + GroupManager groupManager = new GroupManager<>( new GroupCommitConfig(2, 100, 400, 60000, TIMEOUT_CHECK_INTERVAL_MILLIS), keyManipulator); @@ -345,8 +354,10 @@ void removeSlotFromGroup_GivenKeyForSlotInNormalGroup_ShouldRemoveSlotFromThemPr // - NormalGroup("0000", SIZE-FIXED, slots:[Slot("child-key-1"), Slot("child-key-2")]) // - NormalGroup("0001", OPEN, slots:[Slot("child-key-3")]) - Group groupForKeys1 = groupManager.getGroup(keys1); - Group groupForKeys3 = groupManager.getGroup(keys3); + Group groupForKeys1 = + groupManager.getGroup(keys1); + Group groupForKeys3 = + groupManager.getGroup(keys3); // Act // Assert @@ -372,7 +383,7 @@ void removeSlotFromGroup_GivenKeyForSlotInDelayedGroup_ShouldRemoveSlotFromThemP throws ExecutionException, InterruptedException { // Arrange - GroupManager groupManager = + GroupManager groupManager = new GroupManager<>( new GroupCommitConfig(2, 100, 400, 60000, TIMEOUT_CHECK_INTERVAL_MILLIS), keyManipulator); @@ -388,8 +399,8 @@ void removeSlotFromGroup_GivenKeyForSlotInDelayedGroup_ShouldRemoveSlotFromThemP // Add slot-2. Keys keys2 = keyManipulator.keysFromFullKey(groupManager.reserveNewSlot("child-key-2")); - NormalGroup normalGroupForKey1 = - (NormalGroup) groupManager.getGroup(keys1); + NormalGroup normalGroupForKey1 = + (NormalGroup) groupManager.getGroup(keys1); Future future = executorService.submit(() -> normalGroupForKey1.putValueToSlotAndWait("child-key-1", 42)); executorService.shutdown(); @@ -405,7 +416,8 @@ void removeSlotFromGroup_GivenKeyForSlotInDelayedGroup_ShouldRemoveSlotFromThemP // These groups are supposed to exist at this moment. // - NormalGroup("0000", READY, slots:[Slot(READY, "child-key-1")]) // - DelayedGroup("0000:child-key-2", SIZE-FIXED, slots:[Slot("child-key-2")]) - Group groupForKey2 = groupManager.getGroup(keys2); + Group groupForKey2 = + groupManager.getGroup(keys2); assertThat(groupForKey2).isInstanceOf(DelayedGroup.class); assertThat(groupForKey2.isSizeFixed()).isTrue(); assertThat(groupForKey2.isReady()).isFalse(); @@ -430,7 +442,7 @@ void removeSlotFromGroup_GivenKeyForSlotInDelayedGroup_ShouldRemoveSlotFromThemP void moveDelayedSlotToDelayedGroup_GivenKeyForOpenGroup_ShouldKeepThem() { // Arrange - GroupManager groupManager = + GroupManager groupManager = new GroupManager<>( new GroupCommitConfig(2, 100, 400, 60000, TIMEOUT_CHECK_INTERVAL_MILLIS), keyManipulator); @@ -444,8 +456,8 @@ void moveDelayedSlotToDelayedGroup_GivenKeyForOpenGroup_ShouldKeepThem() { // These groups are supposed to exist at this moment. // - NormalGroup("0000", OPEN, slots:[Slot("child-key-1")]) - NormalGroup normalGroupForKey1 = - (NormalGroup) groupManager.getGroup(keys1); + NormalGroup normalGroupForKey1 = + (NormalGroup) groupManager.getGroup(keys1); // Act boolean moved = groupManager.moveDelayedSlotToDelayedGroup(normalGroupForKey1); @@ -461,7 +473,7 @@ void moveDelayedSlotToDelayedGroup_GivenKeyForOpenGroupWithReadySlot_ShouldKeepT throws ExecutionException, InterruptedException { // Arrange - GroupManager groupManager = + GroupManager groupManager = new GroupManager<>( new GroupCommitConfig(3, 100, 400, 60000, TIMEOUT_CHECK_INTERVAL_MILLIS), keyManipulator); @@ -479,8 +491,8 @@ void moveDelayedSlotToDelayedGroup_GivenKeyForOpenGroupWithReadySlot_ShouldKeepT // These groups are supposed to exist at this moment. // - NormalGroup("0000", OPEN, slots:[Slot("child-key-1"), Slot("child-key-2")]) - NormalGroup normalGroupForKey1 = - (NormalGroup) groupManager.getGroup(keys1); + NormalGroup normalGroupForKey1 = + (NormalGroup) groupManager.getGroup(keys1); Future future = executorService.submit(() -> normalGroupForKey1.putValueToSlotAndWait("child-key-1", 42)); executorService.shutdown(); @@ -517,7 +529,7 @@ void moveDelayedSlotToDelayedGroup_GivenKeyForOpenGroupWithReadySlot_ShouldKeepT void moveDelayedSlotToDelayedGroup_GivenKeyForSizeFixedGroupWithAllNotReadySlots_ShouldKeepThem() { // Arrange - GroupManager groupManager = + GroupManager groupManager = new GroupManager<>( new GroupCommitConfig(2, 100, 400, 60000, TIMEOUT_CHECK_INTERVAL_MILLIS), keyManipulator); @@ -533,8 +545,8 @@ void moveDelayedSlotToDelayedGroup_GivenKeyForOpenGroupWithReadySlot_ShouldKeepT // These groups are supposed to exist at this moment. // - NormalGroup("0000", SIZE-FIXED, slots:[Slot("child-key-1"), Slot("child-key-2")]) - NormalGroup normalGroupForKey1 = - (NormalGroup) groupManager.getGroup(keys1); + NormalGroup normalGroupForKey1 = + (NormalGroup) groupManager.getGroup(keys1); // Act boolean moved = groupManager.moveDelayedSlotToDelayedGroup(normalGroupForKey1); @@ -551,7 +563,7 @@ void moveDelayedSlotToDelayedGroup_GivenKeyForOpenGroupWithReadySlot_ShouldKeepT throws InterruptedException, ExecutionException { // Arrange - GroupManager groupManager = + GroupManager groupManager = new GroupManager<>( new GroupCommitConfig(2, 100, 400, 60000, TIMEOUT_CHECK_INTERVAL_MILLIS), keyManipulator); @@ -567,8 +579,8 @@ void moveDelayedSlotToDelayedGroup_GivenKeyForOpenGroupWithReadySlot_ShouldKeepT // Add slot-2. Keys keys2 = keyManipulator.keysFromFullKey(groupManager.reserveNewSlot("child-key-2")); - NormalGroup normalGroupForKey1 = - (NormalGroup) groupManager.getGroup(keys1); + NormalGroup normalGroupForKey1 = + (NormalGroup) groupManager.getGroup(keys1); Future future = executorService.submit(() -> normalGroupForKey1.putValueToSlotAndWait("child-key-1", 42)); executorService.shutdown(); @@ -590,8 +602,9 @@ void moveDelayedSlotToDelayedGroup_GivenKeyForOpenGroupWithReadySlot_ShouldKeepT assertThat(normalGroupForKey1.slots.size()).isEqualTo(1); assertThat(normalGroupForKey1.isDone()).isTrue(); - DelayedGroup delayedGroupForKey2 = - (DelayedGroup) groupManager.getGroup(keys2); + DelayedGroup delayedGroupForKey2 = + (DelayedGroup) + groupManager.getGroup(keys2); assertThat(delayedGroupForKey2.isSizeFixed()).isTrue(); assertThat(delayedGroupForKey2.isReady()).isFalse(); } diff --git a/core/src/test/java/com/scalar/db/util/groupcommit/GroupSizeFixWorkerTest.java b/core/src/test/java/com/scalar/db/util/groupcommit/GroupSizeFixWorkerTest.java index 5abee3c64e..0479d9b673 100644 --- a/core/src/test/java/com/scalar/db/util/groupcommit/GroupSizeFixWorkerTest.java +++ b/core/src/test/java/com/scalar/db/util/groupcommit/GroupSizeFixWorkerTest.java @@ -23,15 +23,18 @@ class GroupSizeFixWorkerTest { private static final int LONG_WAIT_MILLIS = 500; @Mock - private DelayedSlotMoveWorker delayedSlotMoveWorker; - - @Mock private GroupCleanupWorker groupCleanupWorker; - @Mock private NormalGroup normalGroup1; - @Mock private NormalGroup normalGroup2; - @Mock private NormalGroup normalGroup3; - @Mock private NormalGroup normalGroup4; - private GroupSizeFixWorker worker; - private GroupSizeFixWorker workerWithWait; + private DelayedSlotMoveWorker + delayedSlotMoveWorker; + + @Mock + private GroupCleanupWorker groupCleanupWorker; + + @Mock private NormalGroup normalGroup1; + @Mock private NormalGroup normalGroup2; + @Mock private NormalGroup normalGroup3; + @Mock private NormalGroup normalGroup4; + private GroupSizeFixWorker worker; + private GroupSizeFixWorker workerWithWait; @BeforeEach void setUp() { diff --git a/core/src/test/java/com/scalar/db/util/groupcommit/NormalGroupTest.java b/core/src/test/java/com/scalar/db/util/groupcommit/NormalGroupTest.java index 4c8f980fce..e4090b392d 100644 --- a/core/src/test/java/com/scalar/db/util/groupcommit/NormalGroupTest.java +++ b/core/src/test/java/com/scalar/db/util/groupcommit/NormalGroupTest.java @@ -1,12 +1,12 @@ package com.scalar.db.util.groupcommit; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import com.google.common.util.concurrent.Uninterruptibles; +import com.scalar.db.util.ThrowableRunnable; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -18,14 +18,17 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +@ExtendWith(MockitoExtension.class) class NormalGroupTest { - private Emittable emitter; + @Mock private Emittable emitter; private TestableKeyManipulator keyManipulator; @BeforeEach void setUp() { - emitter = (s, values) -> {}; // This generates parent keys which start with "0000" and increment by one for each subsequent // key ("0001", "0002"...). keyManipulator = new TestableKeyManipulator(); @@ -35,7 +38,7 @@ void setUp() { void parentKey_WithKeyManipulator_ShouldReturnProperly() { // Arrange GroupCommitConfig config = new GroupCommitConfig(2, 100, 1000, 60000, 20); - NormalGroup group = + NormalGroup group = new NormalGroup<>(config, emitter, keyManipulator); // Act @@ -47,7 +50,7 @@ void parentKey_WithKeyManipulator_ShouldReturnProperly() { void fullKey_WithKeyManipulator_ShouldReturnProperly() { // Arrange GroupCommitConfig config = new GroupCommitConfig(2, 100, 1000, 60000, 20); - NormalGroup group = + NormalGroup group = new NormalGroup<>(config, emitter, keyManipulator); // Act @@ -59,10 +62,10 @@ void fullKey_WithKeyManipulator_ShouldReturnProperly() { void reserveNewSlot_GivenArbitrarySlot_ShouldStoreIt() { // Arrange GroupCommitConfig config = new GroupCommitConfig(2, 100, 1000, 60000, 20); - NormalGroup group = + NormalGroup group = new NormalGroup<>(config, emitter, keyManipulator); - Slot slot1 = new Slot<>("child-key-1", group); - Slot slot2 = new Slot<>("child-key-2", group); + Slot slot1 = new Slot<>("child-key-1", group); + Slot slot2 = new Slot<>("child-key-2", group); assertThat(group.size()).isNull(); assertThat(group.isSizeFixed()).isFalse(); @@ -79,6 +82,24 @@ void reserveNewSlot_GivenArbitrarySlot_ShouldStoreIt() { assertThat(group.isReady()).isFalse(); } + Emittable createEmitter(ThrowableRunnable task) { + return new Emittable() { + @Override + public void emitNormalGroup(String s, List values) { + try { + task.run(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void emitDelayedGroup(String s, Integer value) { + throw new AssertionError(); + } + }; + } + @Test void putValueToSlotAndWait_WithSuccessfulEmitTaskWithSingleSlot_ShouldExecuteTaskProperly() throws InterruptedException, ExecutionException { @@ -86,14 +107,15 @@ void putValueToSlotAndWait_WithSuccessfulEmitTaskWithSingleSlot_ShouldExecuteTas GroupCommitConfig config = new GroupCommitConfig(2, 100, 1000, 60000, 20); AtomicBoolean emitted = new AtomicBoolean(); CountDownLatch wait = new CountDownLatch(1); - Emittable waitableEmitter = - (s, values) -> { - wait.await(); - emitted.set(true); - }; - NormalGroup group = + Emittable waitableEmitter = + createEmitter( + () -> { + wait.await(); + emitted.set(true); + }); + NormalGroup group = new NormalGroup<>(config, waitableEmitter, keyManipulator); - Slot slot1 = new Slot<>("child-key-1", group); + Slot slot1 = new Slot<>("child-key-1", group); ExecutorService executorService = Executors.newCachedThreadPool(); group.reserveNewSlot(slot1); @@ -136,15 +158,16 @@ void putValueToSlotAndWait_WithSuccessfulEmitTask_ShouldExecuteTaskProperly() GroupCommitConfig config = new GroupCommitConfig(2, 100, 1000, 60000, 20); AtomicBoolean emitted = new AtomicBoolean(); CountDownLatch wait = new CountDownLatch(1); - Emittable waitableEmitter = - (s, values) -> { - wait.await(); - emitted.set(true); - }; - NormalGroup group = + Emittable waitableEmitter = + createEmitter( + () -> { + wait.await(); + emitted.set(true); + }); + NormalGroup group = new NormalGroup<>(config, waitableEmitter, keyManipulator); - Slot slot1 = new Slot<>("child-key-1", group); - Slot slot2 = new Slot<>("child-key-2", group); + Slot slot1 = new Slot<>("child-key-1", group); + Slot slot2 = new Slot<>("child-key-2", group); ExecutorService executorService = Executors.newCachedThreadPool(); group.reserveNewSlot(slot1); @@ -186,56 +209,6 @@ void putValueToSlotAndWait_WithSuccessfulEmitTask_ShouldExecuteTaskProperly() assertThat(emitted.get()).isTrue(); } - @Test - void putValueToSlotAndWait_WithFailingEmitTask_ShouldFail() { - // Arrange - GroupCommitConfig config = new GroupCommitConfig(2, 100, 1000, 60000, 20); - CountDownLatch wait = new CountDownLatch(1); - Emittable failingEmitter = - (s, values) -> { - wait.await(); - throw new RuntimeException("Something is wrong"); - }; - NormalGroup oldGroup = - new NormalGroup<>(config, failingEmitter, keyManipulator); - Slot slot = new Slot<>("child-key", oldGroup); - DelayedGroup group = - new DelayedGroup<>(config, "0000:full-key", failingEmitter, keyManipulator); - - ExecutorService executorService = Executors.newCachedThreadPool(); - - group.reserveNewSlot(slot); - - // Act - // Assert - - // Put value to the slots. - // Using different threads since calling putValueToSlotAndWait() will block the client thread - // until emitting. - List> futures = new ArrayList<>(); - futures.add( - executorService.submit( - () -> { - group.putValueToSlotAndWait(slot.key(), 42); - return null; - })); - executorService.shutdown(); - Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); - // The status is READY not DONE. - assertThat(group.isReady()).isTrue(); - assertThat(group.isDone()).isFalse(); - - // Resume the blocked emit task to move forward to DONE. - wait.countDown(); - for (Future future : futures) { - Throwable cause = assertThrows(ExecutionException.class, future::get).getCause(); - assertThat(cause).isInstanceOf(GroupCommitException.class); - } - - group.updateStatus(); - assertThat(group.isDone()).isTrue(); - } - @Test void removeNotReadySlots_WhenBothReadyAndNonReadySlotsExist_ShouldExecuteEmitTaskProperly() throws InterruptedException, ExecutionException { @@ -243,16 +216,17 @@ void removeNotReadySlots_WhenBothReadyAndNonReadySlotsExist_ShouldExecuteEmitTas GroupCommitConfig config = new GroupCommitConfig(2 + 1, 100, 1000, 60000, 20); AtomicBoolean emitted = new AtomicBoolean(); CountDownLatch wait = new CountDownLatch(1); - Emittable waitableEmitter = - (s, values) -> { - wait.await(); - emitted.set(true); - }; - NormalGroup group = + Emittable waitableEmitter = + createEmitter( + () -> { + wait.await(); + emitted.set(true); + }); + NormalGroup group = new NormalGroup<>(config, waitableEmitter, keyManipulator); - Slot slot1 = new Slot<>("child-key-1", group); - Slot slot2 = new Slot<>("child-key-2", group); - Slot slot3 = new Slot<>("child-key-3", group); + Slot slot1 = new Slot<>("child-key-1", group); + Slot slot2 = new Slot<>("child-key-2", group); + Slot slot3 = new Slot<>("child-key-3", group); ExecutorService executorService = Executors.newCachedThreadPool(); group.reserveNewSlot(slot1); @@ -284,7 +258,8 @@ void removeNotReadySlots_WhenBothReadyAndNonReadySlotsExist_ShouldExecuteEmitTas // Assert // Remove the not-ready slot (slot3). - List> notReadySlots = group.removeNotReadySlots(); + List> notReadySlots = + group.removeNotReadySlots(); assertThat(notReadySlots).isNotNull().size().isEqualTo(1); assertThat(notReadySlots.get(0).isReady()).isFalse(); assertThat(notReadySlots.get(0).isDone()).isFalse(); @@ -308,10 +283,10 @@ void removeNotReadySlots_WhenBothReadyAndNonReadySlotsExist_ShouldExecuteEmitTas void removeNotReadySlots_WhenAllSlotsAreNotReady_ShouldRetainSlots() { // Arrange GroupCommitConfig config = new GroupCommitConfig(2, 100, 1000, 60000, 20); - NormalGroup group = + NormalGroup group = new NormalGroup<>(config, emitter, keyManipulator); - Slot slot1 = new Slot<>("child-key-1", group); - Slot slot2 = new Slot<>("child-key-2", group); + Slot slot1 = new Slot<>("child-key-1", group); + Slot slot2 = new Slot<>("child-key-2", group); group.reserveNewSlot(slot1); group.reserveNewSlot(slot2); @@ -327,10 +302,10 @@ void removeNotReadySlots_WhenAllSlotsAreNotReady_ShouldRetainSlots() { void removeSlot_GivenNotReadySlot_ShouldRemoveSlotAndGetDone() { // Arrange GroupCommitConfig config = new GroupCommitConfig(2, 100, 1000, 60000, 20); - NormalGroup group = + NormalGroup group = new NormalGroup<>(config, emitter, keyManipulator); - Slot slot1 = new Slot<>("child-key-1", group); - Slot slot2 = new Slot<>("child-key-2", group); + Slot slot1 = new Slot<>("child-key-1", group); + Slot slot2 = new Slot<>("child-key-2", group); group.reserveNewSlot(slot1); group.reserveNewSlot(slot2); @@ -352,10 +327,10 @@ void removeSlot_GivenNotReadySlot_ShouldRemoveSlotAndGetDone() { void removeSlot_GivenReadySlot_ShouldDoNothing() throws ExecutionException, InterruptedException { // Arrange GroupCommitConfig config = new GroupCommitConfig(2, 100, 1000, 60000, 20); - NormalGroup group = + NormalGroup group = new NormalGroup<>(config, emitter, keyManipulator); - Slot slot1 = new Slot<>("child-key-1", group); - Slot slot2 = new Slot<>("child-key-2", group); + Slot slot1 = new Slot<>("child-key-1", group); + Slot slot2 = new Slot<>("child-key-2", group); ExecutorService executorService = Executors.newCachedThreadPool(); group.reserveNewSlot(slot1); @@ -395,10 +370,12 @@ void removeSlot_GivenReadySlot_ShouldDoNothing() throws ExecutionException, Inte void abort_ShouldAbortSlot() { // Arrange GroupCommitConfig config = new GroupCommitConfig(2, 100, 1000, 60000, 20); - NormalGroup group = + NormalGroup group = new NormalGroup<>(config, emitter, keyManipulator); - Slot slot1 = spy(new Slot<>("child-key-1", group)); - Slot slot2 = spy(new Slot<>("child-key-2", group)); + Slot slot1 = + spy(new Slot<>("child-key-1", group)); + Slot slot2 = + spy(new Slot<>("child-key-2", group)); group.reserveNewSlot(slot1); group.reserveNewSlot(slot2); @@ -415,7 +392,7 @@ void oldGroupAbortTimeoutAtMillis_GivenArbitraryTimeoutValue_ShouldReturnProperl // Arrange GroupCommitConfig config = new GroupCommitConfig(2, 100, 1000, 60000, 20); long minOfCurrentTimeMillis = System.currentTimeMillis(); - NormalGroup group = + NormalGroup group = new NormalGroup<>(config, emitter, keyManipulator); long maxOfCurrentTimeMillis = System.currentTimeMillis(); @@ -431,7 +408,7 @@ void groupSizeFixTimeoutAtMillis_GivenArbitraryTimeoutValue_ShouldReturnProperly // Arrange GroupCommitConfig config = new GroupCommitConfig(2, 100, 1000, 60000, 20); long minOfCurrentTimeMillis = System.currentTimeMillis(); - NormalGroup group = + NormalGroup group = new NormalGroup<>(config, emitter, keyManipulator); long maxOfCurrentTimeMillis = System.currentTimeMillis(); @@ -447,7 +424,7 @@ void delayedSlotMoveTimeoutAtMillis_GivenArbitraryTimeoutValue_ShouldReturnPrope // Arrange GroupCommitConfig config = new GroupCommitConfig(2, 100, 1000, 60000, 20); long minOfCurrentTimeMillis = System.currentTimeMillis(); - NormalGroup group = + NormalGroup group = new NormalGroup<>(config, emitter, keyManipulator); long maxOfCurrentTimeMillis = System.currentTimeMillis(); @@ -462,7 +439,7 @@ void delayedSlotMoveTimeoutAtMillis_GivenArbitraryTimeoutValue_ShouldReturnPrope void updateDelayedSlotMoveTimeoutAtMillis_GivenArbitraryTimeoutValue_ShouldUpdateProperly() { // Arrange GroupCommitConfig config = new GroupCommitConfig(2, 100, 1000, 60000, 20); - NormalGroup group = + NormalGroup group = new NormalGroup<>(config, emitter, keyManipulator); // Act diff --git a/core/src/test/java/com/scalar/db/util/groupcommit/SlotTest.java b/core/src/test/java/com/scalar/db/util/groupcommit/SlotTest.java index 7f047f9385..9118869ba7 100644 --- a/core/src/test/java/com/scalar/db/util/groupcommit/SlotTest.java +++ b/core/src/test/java/com/scalar/db/util/groupcommit/SlotTest.java @@ -21,9 +21,9 @@ @ExtendWith(MockitoExtension.class) class SlotTest { - private Slot slot; - @Mock private NormalGroup parentGroup; - @Mock private DelayedGroup newParentGroup; + private Slot slot; + @Mock private NormalGroup parentGroup; + @Mock private DelayedGroup newParentGroup; @BeforeEach void setUp() { @@ -33,7 +33,8 @@ void setUp() { @Test void key_WithArbitraryValue_ShouldReturnProperly() { // Arrange - Slot slot = new Slot<>("child-key", parentGroup); + Slot slot = + new Slot<>("child-key", parentGroup); // Act // Assert diff --git a/core/src/test/java/com/scalar/db/util/groupcommit/TestableKeyManipulator.java b/core/src/test/java/com/scalar/db/util/groupcommit/TestableKeyManipulator.java index e44ffcb96c..fe0be90040 100644 --- a/core/src/test/java/com/scalar/db/util/groupcommit/TestableKeyManipulator.java +++ b/core/src/test/java/com/scalar/db/util/groupcommit/TestableKeyManipulator.java @@ -4,7 +4,7 @@ import java.util.List; import java.util.concurrent.atomic.AtomicInteger; -class TestableKeyManipulator implements KeyManipulator { +class TestableKeyManipulator implements KeyManipulator { private final AtomicInteger counter = new AtomicInteger(); @Override @@ -33,12 +33,12 @@ public Keys keysFromFullKey(String fullKey) { } @Override - public String emitKeyFromFullKey(String s) { + public String emitFullKeyFromFullKey(String s) { return s; } @Override - public String emitKeyFromParentKey(String s) { + public String emitParentKeyFromParentKey(String s) { return s; } }