Skip to content

Commit

Permalink
JavaDocs, comments, cleanup
Browse files Browse the repository at this point in the history
Signed-off-by: Artem Ananev <[email protected]>
  • Loading branch information
artemananiev committed Mar 29, 2024
1 parent 238d515 commit 0091b86
Show file tree
Hide file tree
Showing 20 changed files with 273 additions and 593 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,10 @@ public TransactionRequestConfig get() {
final TransactionRequestConfig transactionRequestConfig = sequenceOfConfigs.get(transactionConfigIndex);
if (transactionRequestConfig.decrementAndGetAmount() == 0) {
this.transactionConfigIndex++;
logger.info(LogMarker.DEMO_INFO.getMarker(),
"Finished generating {} transactions", transactionRequestConfig.getType());
logger.info(
LogMarker.DEMO_INFO.getMarker(),
"Finished generating {} transactions",
transactionRequestConfig.getType());
}

return transactionRequestConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1052,7 +1052,7 @@ public static <T extends MerkleNode> T testSynchronization(

final MerkleNode generatedTree = learner.getRoot();

// assertReconnectValidity(startingTree, desiredTree, generatedTree);
assertReconnectValidity(startingTree, desiredTree, generatedTree);

return (T) generatedTree;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@
* @param reconnectMode
* Reconnect mode. Accepted values are "push" (teacher pushes lessons to learner, this is the model
* used for the rest of the merkle tree), "pullTopToBottom" (learner sends node requests to teacher
* starting from the root, rank by rank to leaves), and "pullTwoPhaseParents" (learner sends node
* requests to teacher starting from leaves and clean internal nodes).
* starting from the root, rank by rank to leaves), and "pullTwoPhasePessimistic" (learner sends node
* requests to teacher starting from clean leaf parent nodes and then leaves).
* @param reconnectFlushInterval
* During reconnect, virtual nodes are periodically flushed to disk after they are hashed. This
* interval indicates the number of nodes to hash before they are flushed to disk. If zero, all
Expand Down Expand Up @@ -95,9 +95,6 @@ public record VirtualMapConfig(
double percentHashThreads, // FUTURE WORK: We need to add min/max support for double values
@Min(-1) @ConfigProperty(defaultValue = "-1") int numHashThreads,
@Min(1) @Max(64) @ConfigProperty(defaultValue = "3") int virtualHasherChunkHeight,
// @ConfigProperty(defaultValue = "push") String reconnectMode,
// @ConfigProperty(defaultValue = "pullTopToBottom") String reconnectMode,
// @ConfigProperty(defaultValue = "pullTwoPhaseParents") String reconnectMode,
@ConfigProperty(defaultValue = "pullTwoPhasePessimistic") String reconnectMode,
@Min(0) @ConfigProperty(defaultValue = "500000") int reconnectFlushInterval,
@Min(0) @Max(100) @ConfigProperty(defaultValue = "25.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,7 @@
import com.swirlds.virtualmap.internal.reconnect.TeacherPullVirtualTreeView;
import com.swirlds.virtualmap.internal.reconnect.TeacherPushVirtualTreeView;
import com.swirlds.virtualmap.internal.reconnect.TopToBottomTraversalOrder;
import com.swirlds.virtualmap.internal.reconnect.TwoPhaseParentsTraversalOrder;
import com.swirlds.virtualmap.internal.reconnect.TwoPhasePessimisticTraversalOrder;
import com.swirlds.virtualmap.internal.reconnect.VirtualLearnerTreeView;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.io.IOException;
Expand Down Expand Up @@ -1381,7 +1379,6 @@ public boolean isDetached() {
public TeacherTreeView<Long> buildTeacherView(final ReconnectConfig reconnectConfig) {
switch (config.reconnectMode()) {
case "pullTopToBottom":
case "pullTwoPhaseParents":
case "pullTwoPhasePessimistic":
return new TeacherPullVirtualTreeView<>(
getStaticThreadManager(), reconnectConfig, this, state, pipeline);
Expand Down Expand Up @@ -1462,31 +1459,14 @@ public LearnerTreeView<Long> buildLearnerView(final ReconnectConfig reconnectCon
final LearnerTreeView<Long> learnerTreeView;
switch (config.reconnectMode()) {
case "pullTopToBottom":
{
final VirtualLearnerTreeView view = new LearnerPullVirtualTreeView<>(
reconnectConfig, this, originalMap.records, originalState, reconnectState, nodeRemover);
final NodeTraversalOrder order = new TopToBottomTraversalOrder(view);
view.setNodeTraveralOrder(order);
learnerTreeView = view;
}
break;
case "pullTwoPhaseParents":
{
final VirtualLearnerTreeView view = new LearnerPullVirtualTreeView<>(
reconnectConfig, this, originalMap.records, originalState, reconnectState, nodeRemover);
final NodeTraversalOrder order = new TwoPhaseParentsTraversalOrder(view);
view.setNodeTraveralOrder(order);
learnerTreeView = view;
}
final NodeTraversalOrder topToBottom = new TopToBottomTraversalOrder();
learnerTreeView = new LearnerPullVirtualTreeView<>(
this, originalMap.records, originalState, reconnectState, nodeRemover, topToBottom);
break;
case "pullTwoPhasePessimistic":
{
final VirtualLearnerTreeView view = new LearnerPullVirtualTreeView<>(
reconnectConfig, this, originalMap.records, originalState, reconnectState, nodeRemover);
final NodeTraversalOrder order = new TwoPhasePessimisticTraversalOrder(view);
view.setNodeTraveralOrder(order);
learnerTreeView = view;
}
final NodeTraversalOrder twoPhasePessimistic = new TwoPhasePessimisticTraversalOrder();
learnerTreeView = new LearnerPullVirtualTreeView<>(
this, originalMap.records, originalState, reconnectState, nodeRemover, twoPhasePessimistic);
break;
default:
logger.warn(RECONNECT.getMarker(), "Unknown reconnect mode: " + config.reconnectMode());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.swirlds.virtualmap.internal.reconnect;

import static com.swirlds.logging.legacy.LogMarker.RECONNECT;

import com.swirlds.common.io.streams.SerializableDataInputStream;
import com.swirlds.common.merkle.synchronization.utility.MerkleSynchronizationException;
import com.swirlds.common.threading.pool.StandardWorkGroup;
Expand All @@ -25,6 +27,15 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/**
* A task running on the learner side, which is responsible for getting responses from the teacher.
*
* <p>The task keeps running as long as the corresponding {@link LearnerPullVirtualTreeSendTask}
* is alive, or some responses are expected from the teacher.
*
* <p>For every response from the teacher, the learner view is notified, which in turn notifies
* the current traversal order, so it can recalculate the next virtual path to request.
*/
public class LearnerPullVirtualTreeReceiveTask {

private static final Logger logger = LogManager.getLogger(LearnerPullVirtualTreeReceiveTask.class);
Expand All @@ -34,8 +45,15 @@ public class LearnerPullVirtualTreeReceiveTask {
private final StandardWorkGroup workGroup;
private final SerializableDataInputStream in;
private final VirtualLearnerTreeView view;

// Indicates if the learner sender task is done sending all requests to the teacher
private final AtomicBoolean senderIsFinished;

// Number of requests sent to teacher / responses expected from the teacher. Increased in
// the sending task, decreased in this task
private final AtomicLong expectedResponses;

// Indicates if a response for path 0 (virtual root node) has been received
private final CountDownLatch rootResponseReceived;

/**
Expand Down Expand Up @@ -77,9 +95,9 @@ private void run() {
while (!finished || responseExpected) {
if (responseExpected) {
final PullVirtualTreeResponse response = new PullVirtualTreeResponse(view);
// the learner tree is notified about the new response in deserialize() method below
response.deserialize(in, 0);
// logger.info(RECONNECT.getMarker(), "TOREMOVE Learner receive path: " + response.getPath());
// System.err.println("TOREMOVE Learner receive path: " + response.getPath());
logger.debug(RECONNECT.getMarker(), "Learner receive path: " + response.getPath());
if (response.getPath() == 0) {
rootResponseReceived.countDown();
}
Expand All @@ -91,8 +109,7 @@ private void run() {
finished = senderIsFinished.get();
responseExpected = expectedResponses.get() > 0;
}
// logger.info(RECONNECT.getMarker(), "TOREMOVE Learner receive done");
// System.err.println("TOREMOVE Learner receive done");
logger.debug(RECONNECT.getMarker(), "Learner receive done");
} catch (final Exception ex) {
throw new MerkleSynchronizationException("Exception in the learner's receiving task", ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,22 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/**
* A task running on the learner side, which is responsible for sending requests to the teacher.
*
* <p>The very first request to send is for path 0 (virtual root node). A response to this request
* is waited for before any other requests are sent, because root node response contains virtual
* tree path range on the teacher side.
*
* <p>After the root response has been received, this task keeps sending requests according to
* the provided {@link NodeTraversalOrder}. After the next path to request is {@link
* Path#INVALID_PATH}, this request is sent to indicate that there will be no more requests from
* the learner, and this task is finished.
*/
public class LearnerPullVirtualTreeSendTask {

private static final Logger logger = LogManager.getLogger(LearnerPullVirtualTreeSendTask.class);
Expand All @@ -39,9 +52,17 @@ public class LearnerPullVirtualTreeSendTask {
private final AsyncOutputStream<PullVirtualTreeRequest> out;
private final VirtualLearnerTreeView view;
private final NodeTraversalOrder traversalOrder;

// Indicates if the learner sender task is done sending all requests to the teacher
private final AtomicBoolean senderIsFinished;

// Indicates if a response for path 0 (virtual root node) has been received
private final CountDownLatch rootResponseReceived;

// Number of requests sent to teacher / responses expected from the teacher. Increased in
// this task, decreased in the receiving task
private final AtomicLong responsesExpected;

/**
* Create a thread for sending node requests to the teacher.
*
Expand All @@ -53,20 +74,25 @@ public class LearnerPullVirtualTreeSendTask {
* the view to be used when touching the merkle tree
* @param senderIsFinished
* becomes true once the sending thread has finished
* @param responsesExpected
* number of responses expected from the teacher, increased by one every time a request
* is sent
*/
public LearnerPullVirtualTreeSendTask(
final StandardWorkGroup workGroup,
final AsyncOutputStream<PullVirtualTreeRequest> out,
final VirtualLearnerTreeView view,
final NodeTraversalOrder traversalOrder,
final AtomicBoolean senderIsFinished,
final CountDownLatch rootResponseReceived) {
final CountDownLatch rootResponseReceived,
final AtomicLong responsesExpected) {
this.workGroup = workGroup;
this.out = out;
this.view = view;
this.traversalOrder = traversalOrder;
this.senderIsFinished = senderIsFinished;
this.rootResponseReceived = rootResponseReceived;
this.responsesExpected = responsesExpected;
}

void exec() {
Expand All @@ -75,17 +101,16 @@ void exec() {

private void run() {
try (out) {
// assuming root is always dirty
// Assuming root is always dirty. Root response will contain virtual tree path range
out.sendAsync(new PullVirtualTreeRequest(view, Path.ROOT_PATH, new Hash()));
view.anticipateMesssage();
responsesExpected.incrementAndGet();
if (!rootResponseReceived.await(60, TimeUnit.SECONDS)) {
throw new MerkleSynchronizationException("Timed out waiting for root node response from the teacher");
}

while (true) {
final long path = traversalOrder.getNextPathToSend();
// logger.info(RECONNECT.getMarker(), "TOREMOVE Learner send path: " + path);
// System.err.println("TOREMOVE Learner send path: " + path);
logger.debug(RECONNECT.getMarker(), "Learner send path: " + path);
if (path < Path.INVALID_PATH) {
Thread.onSpinWait();
continue;
Expand All @@ -95,10 +120,9 @@ private void run() {
if (path == Path.INVALID_PATH) {
break;
}
view.anticipateMesssage();
responsesExpected.incrementAndGet();
}
// logger.info(RECONNECT.getMarker(), "TOREMOVE Learner send done");
// System.err.println("TOREMOVE Learner send done");
logger.debug(RECONNECT.getMarker(), "Learner send done");
} catch (final InterruptedException ex) {
logger.warn(RECONNECT.getMarker(), "Learner's sending task interrupted");
Thread.currentThread().interrupt();
Expand Down
Loading

0 comments on commit 0091b86

Please sign in to comment.