diff --git a/platform-sdk/platform-apps/tests/PlatformTestingTool/src/main/java/com/swirlds/demo/virtualmerkle/config/SequentialTransactionRequestSupplier.java b/platform-sdk/platform-apps/tests/PlatformTestingTool/src/main/java/com/swirlds/demo/virtualmerkle/config/SequentialTransactionRequestSupplier.java index 4fa1f44564f9..561744d53424 100644 --- a/platform-sdk/platform-apps/tests/PlatformTestingTool/src/main/java/com/swirlds/demo/virtualmerkle/config/SequentialTransactionRequestSupplier.java +++ b/platform-sdk/platform-apps/tests/PlatformTestingTool/src/main/java/com/swirlds/demo/virtualmerkle/config/SequentialTransactionRequestSupplier.java @@ -46,8 +46,10 @@ public TransactionRequestConfig get() { final TransactionRequestConfig transactionRequestConfig = sequenceOfConfigs.get(transactionConfigIndex); if (transactionRequestConfig.decrementAndGetAmount() == 0) { this.transactionConfigIndex++; - logger.info(LogMarker.DEMO_INFO.getMarker(), - "Finished generating {} transactions", transactionRequestConfig.getType()); + logger.info( + LogMarker.DEMO_INFO.getMarker(), + "Finished generating {} transactions", + transactionRequestConfig.getType()); } return transactionRequestConfig; diff --git a/platform-sdk/swirlds-common/src/testFixtures/java/com/swirlds/common/test/fixtures/merkle/util/MerkleTestUtils.java b/platform-sdk/swirlds-common/src/testFixtures/java/com/swirlds/common/test/fixtures/merkle/util/MerkleTestUtils.java index 02006b627f42..3960f534d06d 100644 --- a/platform-sdk/swirlds-common/src/testFixtures/java/com/swirlds/common/test/fixtures/merkle/util/MerkleTestUtils.java +++ b/platform-sdk/swirlds-common/src/testFixtures/java/com/swirlds/common/test/fixtures/merkle/util/MerkleTestUtils.java @@ -1052,7 +1052,7 @@ public static T testSynchronization( final MerkleNode generatedTree = learner.getRoot(); -// assertReconnectValidity(startingTree, desiredTree, generatedTree); + assertReconnectValidity(startingTree, desiredTree, generatedTree); return (T) generatedTree; } diff --git a/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/config/VirtualMapConfig.java b/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/config/VirtualMapConfig.java index ec425ed3c3ea..cf2e15f35c54 100644 --- a/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/config/VirtualMapConfig.java +++ b/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/config/VirtualMapConfig.java @@ -42,8 +42,8 @@ * @param reconnectMode * Reconnect mode. Accepted values are "push" (teacher pushes lessons to learner, this is the model * used for the rest of the merkle tree), "pullTopToBottom" (learner sends node requests to teacher - * starting from the root, rank by rank to leaves), and "pullTwoPhaseParents" (learner sends node - * requests to teacher starting from leaves and clean internal nodes). + * starting from the root, rank by rank to leaves), and "pullTwoPhasePessimistic" (learner sends node + * requests to teacher starting from clean leaf parent nodes and then leaves). * @param reconnectFlushInterval * During reconnect, virtual nodes are periodically flushed to disk after they are hashed. This * interval indicates the number of nodes to hash before they are flushed to disk. If zero, all @@ -95,9 +95,6 @@ public record VirtualMapConfig( double percentHashThreads, // FUTURE WORK: We need to add min/max support for double values @Min(-1) @ConfigProperty(defaultValue = "-1") int numHashThreads, @Min(1) @Max(64) @ConfigProperty(defaultValue = "3") int virtualHasherChunkHeight, -// @ConfigProperty(defaultValue = "push") String reconnectMode, -// @ConfigProperty(defaultValue = "pullTopToBottom") String reconnectMode, -// @ConfigProperty(defaultValue = "pullTwoPhaseParents") String reconnectMode, @ConfigProperty(defaultValue = "pullTwoPhasePessimistic") String reconnectMode, @Min(0) @ConfigProperty(defaultValue = "500000") int reconnectFlushInterval, @Min(0) @Max(100) @ConfigProperty(defaultValue = "25.0") diff --git a/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/merkle/VirtualRootNode.java b/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/merkle/VirtualRootNode.java index 4422dddfc7fe..e55ca9aa7de5 100644 --- a/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/merkle/VirtualRootNode.java +++ b/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/merkle/VirtualRootNode.java @@ -83,9 +83,7 @@ import com.swirlds.virtualmap.internal.reconnect.TeacherPullVirtualTreeView; import com.swirlds.virtualmap.internal.reconnect.TeacherPushVirtualTreeView; import com.swirlds.virtualmap.internal.reconnect.TopToBottomTraversalOrder; -import com.swirlds.virtualmap.internal.reconnect.TwoPhaseParentsTraversalOrder; import com.swirlds.virtualmap.internal.reconnect.TwoPhasePessimisticTraversalOrder; -import com.swirlds.virtualmap.internal.reconnect.VirtualLearnerTreeView; import edu.umd.cs.findbugs.annotations.NonNull; import edu.umd.cs.findbugs.annotations.Nullable; import java.io.IOException; @@ -1381,7 +1379,6 @@ public boolean isDetached() { public TeacherTreeView buildTeacherView(final ReconnectConfig reconnectConfig) { switch (config.reconnectMode()) { case "pullTopToBottom": - case "pullTwoPhaseParents": case "pullTwoPhasePessimistic": return new TeacherPullVirtualTreeView<>( getStaticThreadManager(), reconnectConfig, this, state, pipeline); @@ -1462,31 +1459,14 @@ public LearnerTreeView buildLearnerView(final ReconnectConfig reconnectCon final LearnerTreeView learnerTreeView; switch (config.reconnectMode()) { case "pullTopToBottom": - { - final VirtualLearnerTreeView view = new LearnerPullVirtualTreeView<>( - reconnectConfig, this, originalMap.records, originalState, reconnectState, nodeRemover); - final NodeTraversalOrder order = new TopToBottomTraversalOrder(view); - view.setNodeTraveralOrder(order); - learnerTreeView = view; - } - break; - case "pullTwoPhaseParents": - { - final VirtualLearnerTreeView view = new LearnerPullVirtualTreeView<>( - reconnectConfig, this, originalMap.records, originalState, reconnectState, nodeRemover); - final NodeTraversalOrder order = new TwoPhaseParentsTraversalOrder(view); - view.setNodeTraveralOrder(order); - learnerTreeView = view; - } + final NodeTraversalOrder topToBottom = new TopToBottomTraversalOrder(); + learnerTreeView = new LearnerPullVirtualTreeView<>( + this, originalMap.records, originalState, reconnectState, nodeRemover, topToBottom); break; case "pullTwoPhasePessimistic": - { - final VirtualLearnerTreeView view = new LearnerPullVirtualTreeView<>( - reconnectConfig, this, originalMap.records, originalState, reconnectState, nodeRemover); - final NodeTraversalOrder order = new TwoPhasePessimisticTraversalOrder(view); - view.setNodeTraveralOrder(order); - learnerTreeView = view; - } + final NodeTraversalOrder twoPhasePessimistic = new TwoPhasePessimisticTraversalOrder(); + learnerTreeView = new LearnerPullVirtualTreeView<>( + this, originalMap.records, originalState, reconnectState, nodeRemover, twoPhasePessimistic); break; default: logger.warn(RECONNECT.getMarker(), "Unknown reconnect mode: " + config.reconnectMode()); diff --git a/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/LearnerPullVirtualTreeReceiveTask.java b/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/LearnerPullVirtualTreeReceiveTask.java index ea43dcd53dff..94129254cdbc 100644 --- a/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/LearnerPullVirtualTreeReceiveTask.java +++ b/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/LearnerPullVirtualTreeReceiveTask.java @@ -16,6 +16,8 @@ package com.swirlds.virtualmap.internal.reconnect; +import static com.swirlds.logging.legacy.LogMarker.RECONNECT; + import com.swirlds.common.io.streams.SerializableDataInputStream; import com.swirlds.common.merkle.synchronization.utility.MerkleSynchronizationException; import com.swirlds.common.threading.pool.StandardWorkGroup; @@ -25,6 +27,15 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +/** + * A task running on the learner side, which is responsible for getting responses from the teacher. + * + *

The task keeps running as long as the corresponding {@link LearnerPullVirtualTreeSendTask} + * is alive, or some responses are expected from the teacher. + * + *

For every response from the teacher, the learner view is notified, which in turn notifies + * the current traversal order, so it can recalculate the next virtual path to request. + */ public class LearnerPullVirtualTreeReceiveTask { private static final Logger logger = LogManager.getLogger(LearnerPullVirtualTreeReceiveTask.class); @@ -34,8 +45,15 @@ public class LearnerPullVirtualTreeReceiveTask { private final StandardWorkGroup workGroup; private final SerializableDataInputStream in; private final VirtualLearnerTreeView view; + + // Indicates if the learner sender task is done sending all requests to the teacher private final AtomicBoolean senderIsFinished; + + // Number of requests sent to teacher / responses expected from the teacher. Increased in + // the sending task, decreased in this task private final AtomicLong expectedResponses; + + // Indicates if a response for path 0 (virtual root node) has been received private final CountDownLatch rootResponseReceived; /** @@ -77,9 +95,9 @@ private void run() { while (!finished || responseExpected) { if (responseExpected) { final PullVirtualTreeResponse response = new PullVirtualTreeResponse(view); + // the learner tree is notified about the new response in deserialize() method below response.deserialize(in, 0); - // logger.info(RECONNECT.getMarker(), "TOREMOVE Learner receive path: " + response.getPath()); -// System.err.println("TOREMOVE Learner receive path: " + response.getPath()); + logger.debug(RECONNECT.getMarker(), "Learner receive path: " + response.getPath()); if (response.getPath() == 0) { rootResponseReceived.countDown(); } @@ -91,8 +109,7 @@ private void run() { finished = senderIsFinished.get(); responseExpected = expectedResponses.get() > 0; } - // logger.info(RECONNECT.getMarker(), "TOREMOVE Learner receive done"); -// System.err.println("TOREMOVE Learner receive done"); + logger.debug(RECONNECT.getMarker(), "Learner receive done"); } catch (final Exception ex) { throw new MerkleSynchronizationException("Exception in the learner's receiving task", ex); } diff --git a/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/LearnerPullVirtualTreeSendTask.java b/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/LearnerPullVirtualTreeSendTask.java index 090f197e0b96..e7cc89244ce5 100644 --- a/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/LearnerPullVirtualTreeSendTask.java +++ b/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/LearnerPullVirtualTreeSendTask.java @@ -26,9 +26,22 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +/** + * A task running on the learner side, which is responsible for sending requests to the teacher. + * + *

The very first request to send is for path 0 (virtual root node). A response to this request + * is waited for before any other requests are sent, because root node response contains virtual + * tree path range on the teacher side. + * + *

After the root response has been received, this task keeps sending requests according to + * the provided {@link NodeTraversalOrder}. After the next path to request is {@link + * Path#INVALID_PATH}, this request is sent to indicate that there will be no more requests from + * the learner, and this task is finished. + */ public class LearnerPullVirtualTreeSendTask { private static final Logger logger = LogManager.getLogger(LearnerPullVirtualTreeSendTask.class); @@ -39,9 +52,17 @@ public class LearnerPullVirtualTreeSendTask { private final AsyncOutputStream out; private final VirtualLearnerTreeView view; private final NodeTraversalOrder traversalOrder; + + // Indicates if the learner sender task is done sending all requests to the teacher private final AtomicBoolean senderIsFinished; + + // Indicates if a response for path 0 (virtual root node) has been received private final CountDownLatch rootResponseReceived; + // Number of requests sent to teacher / responses expected from the teacher. Increased in + // this task, decreased in the receiving task + private final AtomicLong responsesExpected; + /** * Create a thread for sending node requests to the teacher. * @@ -53,6 +74,9 @@ public class LearnerPullVirtualTreeSendTask { * the view to be used when touching the merkle tree * @param senderIsFinished * becomes true once the sending thread has finished + * @param responsesExpected + * number of responses expected from the teacher, increased by one every time a request + * is sent */ public LearnerPullVirtualTreeSendTask( final StandardWorkGroup workGroup, @@ -60,13 +84,15 @@ public LearnerPullVirtualTreeSendTask( final VirtualLearnerTreeView view, final NodeTraversalOrder traversalOrder, final AtomicBoolean senderIsFinished, - final CountDownLatch rootResponseReceived) { + final CountDownLatch rootResponseReceived, + final AtomicLong responsesExpected) { this.workGroup = workGroup; this.out = out; this.view = view; this.traversalOrder = traversalOrder; this.senderIsFinished = senderIsFinished; this.rootResponseReceived = rootResponseReceived; + this.responsesExpected = responsesExpected; } void exec() { @@ -75,17 +101,16 @@ void exec() { private void run() { try (out) { - // assuming root is always dirty + // Assuming root is always dirty. Root response will contain virtual tree path range out.sendAsync(new PullVirtualTreeRequest(view, Path.ROOT_PATH, new Hash())); - view.anticipateMesssage(); + responsesExpected.incrementAndGet(); if (!rootResponseReceived.await(60, TimeUnit.SECONDS)) { throw new MerkleSynchronizationException("Timed out waiting for root node response from the teacher"); } while (true) { final long path = traversalOrder.getNextPathToSend(); - // logger.info(RECONNECT.getMarker(), "TOREMOVE Learner send path: " + path); -// System.err.println("TOREMOVE Learner send path: " + path); + logger.debug(RECONNECT.getMarker(), "Learner send path: " + path); if (path < Path.INVALID_PATH) { Thread.onSpinWait(); continue; @@ -95,10 +120,9 @@ private void run() { if (path == Path.INVALID_PATH) { break; } - view.anticipateMesssage(); + responsesExpected.incrementAndGet(); } - // logger.info(RECONNECT.getMarker(), "TOREMOVE Learner send done"); -// System.err.println("TOREMOVE Learner send done"); + logger.debug(RECONNECT.getMarker(), "Learner send done"); } catch (final InterruptedException ex) { logger.warn(RECONNECT.getMarker(), "Learner's sending task interrupted"); Thread.currentThread().interrupt(); diff --git a/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/LearnerPullVirtualTreeView.java b/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/LearnerPullVirtualTreeView.java index 91ec08ce4507..263f53564deb 100644 --- a/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/LearnerPullVirtualTreeView.java +++ b/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/LearnerPullVirtualTreeView.java @@ -16,19 +16,15 @@ package com.swirlds.virtualmap.internal.reconnect; -import static com.swirlds.logging.legacy.LogMarker.RECONNECT; import static com.swirlds.virtualmap.internal.Path.ROOT_PATH; import com.swirlds.common.crypto.CryptographyHolder; -import com.swirlds.common.crypto.DigestType; import com.swirlds.common.crypto.Hash; import com.swirlds.common.io.streams.MerkleDataInputStream; import com.swirlds.common.io.streams.MerkleDataOutputStream; import com.swirlds.common.io.streams.SerializableDataInputStream; import com.swirlds.common.merkle.MerkleNode; import com.swirlds.common.merkle.synchronization.LearningSynchronizer; -import com.swirlds.common.merkle.synchronization.config.ReconnectConfig; -import com.swirlds.common.merkle.synchronization.streams.AsyncInputStream; import com.swirlds.common.merkle.synchronization.streams.AsyncOutputStream; import com.swirlds.common.merkle.synchronization.task.ExpectedLesson; import com.swirlds.common.merkle.synchronization.task.ReconnectNodeCount; @@ -43,7 +39,6 @@ import com.swirlds.virtualmap.internal.VirtualStateAccessor; import com.swirlds.virtualmap.internal.merkle.VirtualRootNode; import java.io.IOException; -import java.io.InputStream; import java.util.Objects; import java.util.Queue; import java.util.concurrent.CountDownLatch; @@ -59,6 +54,9 @@ * This implementation uses {@link Long} as the representation of a node and corresponds directly * to the path of the node. * + *

This implementation is supposed to work with {@link TeacherPullVirtualTreeView} on the + * teacher side. + * * @param * The key * @param @@ -75,8 +73,6 @@ public final class LearnerPullVirtualTreeView originalRecords; - private NodeTraversalOrder traversalOrder; - - private final AtomicLong expectedResponses = new AtomicLong(0); + /** + * Node traversal order. Defines the order in which node requests will be sent to the teacher. + */ + private final NodeTraversalOrder traversalOrder; - private boolean firstNode = true; + /** + * Indicates if no responses from the teacher have been received yet. The very first response + * must be for path 0 (root virtual node) + */ + private boolean firstNodeResponse = true; /** * True until we have handled our first leaf @@ -120,20 +121,15 @@ public final class LearnerPullVirtualTreeView root, final RecordAccessor originalRecords, final VirtualStateAccessor originalState, final VirtualStateAccessor reconnectState, - final ReconnectNodeRemover nodeRemover) { + final ReconnectNodeRemover nodeRemover, + final NodeTraversalOrder traversalOrder) { super(root, originalState, reconnectState); - this.reconnectConfig = reconnectConfig; this.originalRecords = Objects.requireNonNull(originalRecords); this.nodeRemover = nodeRemover; - } - - @Override - public void setNodeTraveralOrder(final NodeTraversalOrder traversalOrder) { this.traversalOrder = traversalOrder; } @@ -154,6 +150,7 @@ public void startLearnerTasks( final AtomicBoolean senderIsFinished = new AtomicBoolean(); final CountDownLatch rootResponseReceived = new CountDownLatch(1); + final AtomicLong expectedResponses = new AtomicLong(0); final LearnerPullVirtualTreeReceiveTask learnerReceiveTask = new LearnerPullVirtualTreeReceiveTask( workGroup, inputStream, this, senderIsFinished, expectedResponses, rootResponseReceived); @@ -161,7 +158,7 @@ public void startLearnerTasks( reconstructedRoot.set(0L); assert traversalOrder != null; final LearnerPullVirtualTreeSendTask learnerSendTask = new LearnerPullVirtualTreeSendTask( - workGroup, out, this, traversalOrder, senderIsFinished, rootResponseReceived); + workGroup, out, this, traversalOrder, senderIsFinished, rootResponseReceived, expectedResponses); learnerSendTask.exec(); } @@ -171,23 +168,24 @@ private boolean isLeaf(long path) { } @Override - public void readNode(final SerializableDataInputStream in, final long path, final boolean isClean) throws IOException { + public void readNode(final SerializableDataInputStream in, final long path, final boolean isClean) + throws IOException { if (path == Path.ROOT_PATH) { final long firstLeafPath = in.readLong(); final long lastLeafPath = in.readLong(); - if (firstNode) { + if (firstNodeResponse) { reconnectState.setFirstLeafPath(firstLeafPath); reconnectState.setLastLeafPath(lastLeafPath); root.prepareReconnectHashing(firstLeafPath, lastLeafPath); nodeRemover.setPathInformation(firstLeafPath, lastLeafPath); traversalOrder.start(firstLeafPath, lastLeafPath, nodeCount); - firstNode = false; + firstNodeResponse = false; if (lastLeafPath <= 0) { return; } } } - assert !firstNode : "Root node must be the first node received from the teacher"; + assert !firstNodeResponse : "Root node must be the first node received from the teacher"; final boolean isLeaf = isLeaf(path); traversalOrder.nodeReceived(path, isClean); if (isLeaf) { @@ -203,19 +201,6 @@ public void readNode(final SerializableDataInputStream in, final long path, fina } } - @Override - public void anticipateMesssage() { - expectedResponses.incrementAndGet(); - } - - @Override - public void applySendBackpressure() throws InterruptedException { -// final long t = expectedResponses.get(); -// if (t > 4096) { -// Thread.sleep(1); -// } - } - /** * {@inheritDoc} */ diff --git a/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/NodeTraversalOrder.java b/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/NodeTraversalOrder.java index bc5dfc405af1..fbe98818b0c6 100644 --- a/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/NodeTraversalOrder.java +++ b/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/NodeTraversalOrder.java @@ -16,18 +16,10 @@ package com.swirlds.virtualmap.internal.reconnect; -import com.swirlds.common.crypto.Hash; import com.swirlds.common.merkle.synchronization.task.ReconnectNodeCount; public interface NodeTraversalOrder { - /** - * Used as a return value from {@link #getNextPathToSend()} to indicate there is no path to send - * to the teacher yet. The sending thread should wait and call {@link #getNextPathToSend()} again - * later. - */ - public static final long PATH_NOT_AVAILABLE_YET = -2; - /** * This method is called when the first node, which is always virtual root node, is received from * teacher along with information about virtual tree leaves range. @@ -41,24 +33,17 @@ public interface NodeTraversalOrder { /** * Called by the learner's sending thread to send the next path to teacher. If this method returns * {@link com.swirlds.virtualmap.internal.Path#INVALID_PATH}, it indicates there are no more paths - * to send. Other negative values like {@link #PATH_NOT_AVAILABLE_YET} indicate that there is no - * path to send yet, but there will be some in the future, so this method needs to be called again - * later. - * - *

This method is responsible for backpressure, if any kind of it is supported by this - * traversal strategy. Typical implementation includes calling to {@link - * VirtualLearnerTreeView#applySendBackpressure()}, which slows down the current thread based on - * how many requests are sent to teacher and how many responses are received. + * to send. * - * @return next virtual path to send to teacher + * @return the next virtual path to send to the teacher * @throws InterruptedException if the current thread is interrupted while backpressure waiting */ long getNextPathToSend() throws InterruptedException; /** - * Notifies this object that a node response is received from teacher. + * Notifies this object that a node response is received from the teacher. * - * @param path received node path + * @param path the received node path * @param isClean indicates if the node at the given path matches the corresponding node on the teacher */ void nodeReceived(final long path, final boolean isClean); diff --git a/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/PullVirtualTreeRequest.java b/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/PullVirtualTreeRequest.java index b7f4ca118dd1..6eaac7aa017f 100644 --- a/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/PullVirtualTreeRequest.java +++ b/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/PullVirtualTreeRequest.java @@ -25,7 +25,11 @@ import java.io.IOException; /** - * Used during the synchronization protocol to send data needed to reconstruct a single node. + * Used during the synchronization protocol to send data needed to reconstruct a single virtual node. + * + *

On the learner side, a request is created with a path and a hash in the old learner + * tree (if exists), then sent to the teacher. On the teacher side, requests are deserialized + * from the stream, and for every request a response is sent back to the learner. */ public class PullVirtualTreeRequest implements SelfSerializable { @@ -41,9 +45,13 @@ private static class ClassVersion { // Only used on the learner side private final VirtualLearnerTreeView learnerView; - private long path = -1; + // Virtual node path. If the path is Path.INVALID_PATH, it indicates that the learner will + // not send any more node requests to the teacher + private long path; - private Hash hash = null; + // Virtual node hash. If a node with the given path does not exists on the learner (path is + // outside of range), NULL_HASH is used. If the path is Path.INVALID_PATH, the hash is null + private Hash hash; /** * Zero-arg constructor for constructable registry. @@ -67,6 +75,7 @@ public PullVirtualTreeRequest(final VirtualTeacherTreeView teacherView) { public PullVirtualTreeRequest(final VirtualLearnerTreeView learnerTreeView, final long path, final Hash hash) { this.teacherView = null; this.learnerView = learnerTreeView; + // Null hash for the terminating requests, non-null otherwise assert path == Path.INVALID_PATH || (path >= 0 && hash != null); this.path = path; this.hash = hash; diff --git a/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/PullVirtualTreeResponse.java b/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/PullVirtualTreeResponse.java index 555b838a478b..25274a1bb6d1 100644 --- a/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/PullVirtualTreeResponse.java +++ b/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/PullVirtualTreeResponse.java @@ -24,7 +24,14 @@ import java.io.IOException; /** - * Used during the synchronization protocol to send data needed to reconstruct a single node. + * Used during the synchronization protocol to send data needed to reconstruct a single virtual node. + * + *

The teacher sends one response for every {@link PullVirtualTreeRequest} received from the + * learner. Every response includes a path followed by an integer flag that indicates if the node + * is clear (value 0, node hash on the teacher is the same as sent by the learner), or not (non-zero + * value). If the path corresponds to a leaf node, and the node is not clear, a {@link + * com.swirlds.virtualmap.datasource.VirtualLeafRecord} for the node is included in the end of the + * response. */ public class PullVirtualTreeResponse implements SelfSerializable { @@ -40,10 +47,12 @@ private static class ClassVersion { // Only used on the learner side private final VirtualLearnerTreeView learnerView; - private long path = -1; + // Virtual node path + private long path; - // Only used on the teacher side - private Hash hash; + // Virtual node hash on the learner side. May be NULL_HASH, if the path is outside of path range + // in the old learner virtual tree + private Hash originalHash; /** * Zero-arg constructor for constructable registry. @@ -60,7 +69,8 @@ public PullVirtualTreeResponse(final VirtualTeacherTreeView teacherView, final P this.teacherView = teacherView; this.learnerView = null; this.path = request.getPath(); - this.hash = request.getHash(); + this.originalHash = request.getHash(); + assert originalHash != null; } /** @@ -82,10 +92,11 @@ public void serialize(final SerializableDataOutputStream out) throws IOException assert teacherView != null; out.writeLong(path); Hash teacherHash = teacherView.loadHash(path); + // The only valid scenario, when teacherHash may be null, is the empty tree if ((teacherHash == null) && (path != 0)) { throw new MerkleSerializationException("Cannot load node hash (bad request from learner?), path = " + path); } - final boolean isClean = (teacherHash == null) || teacherHash.equals(hash); + final boolean isClean = (teacherHash == null) || teacherHash.equals(originalHash); out.write(isClean ? 0 : 1); teacherView.writeNode(out, path, isClean); } diff --git a/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/TeacherPullVirtualTreeReceiveTask.java b/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/TeacherPullVirtualTreeReceiveTask.java index 8919c3b000f8..b0c7af7d1538 100644 --- a/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/TeacherPullVirtualTreeReceiveTask.java +++ b/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/TeacherPullVirtualTreeReceiveTask.java @@ -28,12 +28,15 @@ import com.swirlds.common.utility.throttle.RateLimiter; import com.swirlds.virtualmap.internal.Path; import edu.umd.cs.findbugs.annotations.NonNull; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; /** - * This class encapsulates all logic for the teacher's sending thread. + * A task running on the teacher side, which is responsible for processing requests from the + * learner. For every request, a response is sent to the provided async output stream. Async + * streams serialize objects to the underlying output streams in a separate thread. This is + * where the provided hash from the learner is compared with the corresponding hash on the + * teacher. */ public class TeacherPullVirtualTreeReceiveTask { @@ -49,8 +52,6 @@ public class TeacherPullVirtualTreeReceiveTask { private final RateLimiter rateLimiter; private final int sleepNanos; - private final AtomicBoolean allRequestsReceived; - /** * Create new thread that will send data lessons and queries for a subtree. * @@ -67,13 +68,11 @@ public TeacherPullVirtualTreeReceiveTask( final StandardWorkGroup workGroup, final SerializableDataInputStream in, final AsyncOutputStream out, - final VirtualTeacherTreeView view, - final AtomicBoolean allRequestsReceived) { + final VirtualTeacherTreeView view) { this.workGroup = workGroup; this.in = in; this.out = out; this.view = view; - this.allRequestsReceived = allRequestsReceived; final int maxRate = reconnectConfig.teacherMaxNodesPerSecond(); if (maxRate > 0) { @@ -114,25 +113,22 @@ private void run() { rateLimit(); final PullVirtualTreeRequest request = new PullVirtualTreeRequest(view); request.deserialize(in, 0); -// logger.info(RECONNECT.getMarker(), "TOREMOVE Teacher receive path: " + request.getPath()); -// System.err.println("TOREMOVE Teacher receive path: " + request.getPath()); + logger.debug(RECONNECT.getMarker(), "Teacher receive path: " + request.getPath()); if (request.getPath() == Path.INVALID_PATH) { logger.info(RECONNECT.getMarker(), "Teacher receiver is complete as requested by the learner"); break; } -// view.registerRequest(request); final PullVirtualTreeResponse response = new PullVirtualTreeResponse(view, request); + // All real work is done in the async output thread. This call just registers a response + // and returns immediately out.sendAsync(response); } -// logger.info(RECONNECT.getMarker(), "TOREMOVE Teacher receive done"); -// System.err.println("TOREMOVE Teacher receive done"); + logger.debug(RECONNECT.getMarker(), "Teacher receive done"); } catch (final InterruptedException ex) { logger.warn(RECONNECT.getMarker(), "Teacher's receiving task is interrupted"); Thread.currentThread().interrupt(); } catch (final Exception ex) { throw new MerkleSynchronizationException("Exception in the teacher's receiving task", ex); - } finally { - allRequestsReceived.set(true); } } } diff --git a/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/TeacherPullVirtualTreeSendTask.java b/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/TeacherPullVirtualTreeSendTask.java deleted file mode 100644 index 96adbc5a3a3a..000000000000 --- a/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/TeacherPullVirtualTreeSendTask.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Copyright (C) 2021-2024 Hedera Hashgraph, LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.swirlds.virtualmap.internal.reconnect; - -import static com.swirlds.logging.legacy.LogMarker.RECONNECT; - -import com.swirlds.common.merkle.synchronization.config.ReconnectConfig; -import com.swirlds.common.merkle.synchronization.streams.AsyncOutputStream; -import com.swirlds.common.merkle.synchronization.utility.MerkleSynchronizationException; -import com.swirlds.common.threading.pool.StandardWorkGroup; -import edu.umd.cs.findbugs.annotations.NonNull; -import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -/** - * This class encapsulates all logic for the teacher's sending task. - */ -public class TeacherPullVirtualTreeSendTask { - - private static final Logger logger = LogManager.getLogger(TeacherPullVirtualTreeSendTask.class); - - private static final String NAME = "reconnect-teacher-sender"; - - private final StandardWorkGroup workGroup; - private final AsyncOutputStream out; - private final VirtualTeacherTreeView view; - - private final AtomicBoolean allRequestsReceived; - - /** - * Create new thread that will send data lessons and queries for a subtree. - * - * @param reconnectConfig the configuration for reconnect - * @param workGroup the work group managing the reconnect - * @param out the output stream, this object is responsible for closing this object when finished - * class - * @param view an object that interfaces with the subtree - */ - public TeacherPullVirtualTreeSendTask( - @NonNull final ReconnectConfig reconnectConfig, - final StandardWorkGroup workGroup, - final AsyncOutputStream out, - final VirtualTeacherTreeView view, - final AtomicBoolean allRequestsReceived) { - this.workGroup = workGroup; - this.out = out; - this.view = view; - this.allRequestsReceived = allRequestsReceived; - } - - /** - * Start the thread that sends lessons and queries to the learner. - */ - void exec() { - workGroup.execute(NAME, this::run); - } - - /** - * This thread is responsible for sending lessons (and nested queries) to the learner. - */ - private void run() { - try (out) { - while (!allRequestsReceived.get() || view.hasPendingResponses()) { - final PullVirtualTreeResponse response = view.getNextResponse(); - if (response == null) { - Thread.onSpinWait(); - continue; - } -// logger.info(RECONNECT.getMarker(), "TOREMOVE Teacher send path: " + response.getPath()); -// System.err.println("TOREMOVE Teacher send path: " + response.getPath()); - out.sendAsync(response); - } -// logger.info(RECONNECT.getMarker(), "TOREMOVE Teacher send done"); -// System.err.println("TOREMOVE Teacher send done"); - } catch (final InterruptedException ex) { - logger.warn(RECONNECT.getMarker(), "Teacher's sending task is interrupted"); - Thread.currentThread().interrupt(); - } catch (final Exception ex) { - throw new MerkleSynchronizationException("Exception in the teacher's sending task", ex); - } - } -} diff --git a/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/TeacherPullVirtualTreeView.java b/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/TeacherPullVirtualTreeView.java index a33fd81b2086..2996a9a4891b 100644 --- a/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/TeacherPullVirtualTreeView.java +++ b/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/TeacherPullVirtualTreeView.java @@ -26,7 +26,6 @@ import com.swirlds.common.io.streams.SerializableDataOutputStream; import com.swirlds.common.merkle.synchronization.TeachingSynchronizer; import com.swirlds.common.merkle.synchronization.config.ReconnectConfig; -import com.swirlds.common.merkle.synchronization.streams.AsyncInputStream; import com.swirlds.common.merkle.synchronization.streams.AsyncOutputStream; import com.swirlds.common.merkle.synchronization.task.TeacherSubtree; import com.swirlds.common.merkle.synchronization.views.TeacherTreeView; @@ -40,17 +39,22 @@ import com.swirlds.virtualmap.internal.merkle.VirtualRootNode; import com.swirlds.virtualmap.internal.pipeline.VirtualPipeline; import java.io.IOException; -import java.util.Deque; import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; /** * An implementation of {@link TeacherTreeView} designed for virtual merkle trees. * + *

This learner tree view creates two tasks running in the provided work group. One task + * is responsible for sending requests to the teacher, the other one receives responses. Once + * both tasks are completed, the corresponding virtual map is fully synchronized with the + * teacher. + * + *

This implementation is supposed to work with {@link LearnerPullVirtualTreeView} on the + * learner side. + * * @param * The key * @param @@ -117,14 +121,9 @@ public void startTeacherTasks( teachingSynchronizer.buildOutputStream(workGroup, outputStream); out.start(); - final AtomicBoolean allRequestsReceived = new AtomicBoolean(false); - - final TeacherPullVirtualTreeReceiveTask teacherReceiveTask = new TeacherPullVirtualTreeReceiveTask( - time, reconnectConfig, workGroup, inputStream, out, this, allRequestsReceived); + final TeacherPullVirtualTreeReceiveTask teacherReceiveTask = + new TeacherPullVirtualTreeReceiveTask(time, reconnectConfig, workGroup, inputStream, out, this); teacherReceiveTask.exec(); -// final TeacherPullVirtualTreeSendTask teacherSendTask = new TeacherPullVirtualTreeSendTask( -// reconnectConfig, workGroup, out, this, allRequestsReceived); -// teacherSendTask.exec(); } private boolean isLeaf(final long path) { @@ -132,8 +131,8 @@ private boolean isLeaf(final long path) { } @Override - public void writeNode( - final SerializableDataOutputStream out, final long path, final boolean isClean) throws IOException { + public void writeNode(final SerializableDataOutputStream out, final long path, final boolean isClean) + throws IOException { checkValidNode(path, reconnectState); if (path == 0) { out.writeLong(reconnectState.getFirstLeafPath()); @@ -149,23 +148,6 @@ public Hash loadHash(final long path) { return records.findHash(path); } - private final Deque responses = new ConcurrentLinkedDeque<>(); - - @Override - public void registerRequest(final PullVirtualTreeRequest request) { - responses.addLast(new PullVirtualTreeResponse(this, request)); - } - - @Override - public boolean hasPendingResponses() { - return !responses.isEmpty(); - } - - @Override - public PullVirtualTreeResponse getNextResponse() { - return responses.pollFirst(); - } - /** * {@inheritDoc} */ diff --git a/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/TeacherPushVirtualTreeView.java b/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/TeacherPushVirtualTreeView.java index 107007f9b519..233423aad662 100644 --- a/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/TeacherPushVirtualTreeView.java +++ b/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/TeacherPushVirtualTreeView.java @@ -171,7 +171,6 @@ public Long getRoot() { } private final AtomicLong processed = new AtomicLong(0); - private final AtomicLong skipped = new AtomicLong(0); /** * {@inheritDoc} diff --git a/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/TopToBottomTraversalOrder.java b/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/TopToBottomTraversalOrder.java index 07f89ecb9e8a..da2a9bc68565 100644 --- a/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/TopToBottomTraversalOrder.java +++ b/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/TopToBottomTraversalOrder.java @@ -18,29 +18,38 @@ import static com.swirlds.virtualmap.internal.Path.ROOT_PATH; -import com.swirlds.common.crypto.Hash; import com.swirlds.common.merkle.synchronization.task.ReconnectNodeCount; import com.swirlds.virtualmap.internal.Path; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +/** + * Virtual node traversal policy, which starts sending requests from the root node and proceeds + * virtual tree rank by rank, top to bottom. Every node is checked to have a clean parent on + * its path to the root. If a clean parent is found, the node is skipped (not sent to the teacher). + * + *

Clean nodes are tracked in a concurrent set. This set is populated in {@link + * #nodeReceived(long, boolean)} on the receiving thread. Since teacher responses are always + * lagging behind learner requests, some clean nodes may be requested redundantly. That is, + * a request for a clean node is sent before a response for any of its clean parents is + * received from the teacher. + */ public class TopToBottomTraversalOrder implements NodeTraversalOrder { - private final VirtualLearnerTreeView view; - private ReconnectNodeCount nodeCount; private long reconnectFirstLeafPath; private long reconnectLastLeafPath; - // Path 0 is always sent first, before the order is checked + // Last sent path. Initialized to 0, since the root path is always sent first private long lastPath = 0; + // Clean node paths, as received from the teacher. Only internal paths are recorded here, + // there is no need to track clean leaves, since they don't have children. This set is + // populated on the receiving thread and queried on the sending thread private final Set cleanNodes = ConcurrentHashMap.newKeySet(); - public TopToBottomTraversalOrder(final VirtualLearnerTreeView view) { - this.view = view; - } + public TopToBottomTraversalOrder() {} @Override public void start(final long firstLeafPath, final long lastLeafPath, final ReconnectNodeCount nodeCount) { @@ -69,32 +78,33 @@ public void nodeReceived(final long path, final boolean isClean) { } @Override - public long getNextPathToSend() throws InterruptedException { - if (lastPath == 0) { - final int firstLeafRank = Path.getRank(reconnectFirstLeafPath); - if (firstLeafRank > 4) { - // Skip a 3/4 parent ranks above -// lastPath = Path.getRightGrandChildPath(0, firstLeafRank * 3 / 4); - } - } + public long getNextPathToSend() { + assert lastPath != Path.INVALID_PATH; long path = lastPath + 1; long result = skipCleanPaths(path); + // Find the highest clean path and skip all paths in its sub-tree. Repeat while ((result != Path.INVALID_PATH) && (result != path)) { path = result; result = skipCleanPaths(path); } - if (result != lastPath + 1) { -// System.err.println("Skipped: " + (result - lastPath - 1)); - } - view.applySendBackpressure(); return lastPath = result; } + /** + * For the given path, find the highest clean parent path on the way to the root. If such + * a clean parent is found, all paths in the parent's sub-tree at the same rank as the + * initial path are skipped, and the next path outside of the sub-tree is returned (it may + * also be clean, this is handled in the loop in {@link #getNextPathToSend()}. Is the + * sub-tree spans up to the last leaf path, this method returns Path.INVALID_PATH, which + * indicates there are no more requests to the teacher to send. If no clean parents are + * found, the initial path is returned. + */ private long skipCleanPaths(final long path) { assert path > 0; if (path > reconnectLastLeafPath) { return Path.INVALID_PATH; } + // Find the highest clean parent and its rank long parent = Path.getParentPath(path); long cleanParent = Path.INVALID_PATH; int parentRanksAbove = 1; @@ -112,6 +122,8 @@ private long skipCleanPaths(final long path) { // no clean parent found result = path; } else { + // If found, get the right-most path in parent's sub-tree at the initial rank + // and return the next path result = Path.getRightGrandChildPath(cleanParent, cleanParentRanksAbove) + 1; } assert result >= path; diff --git a/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/TwoPhaseParentsTraversalOrder.java b/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/TwoPhaseParentsTraversalOrder.java deleted file mode 100644 index cd9a0dd69b3b..000000000000 --- a/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/TwoPhaseParentsTraversalOrder.java +++ /dev/null @@ -1,266 +0,0 @@ -/* - * Copyright (C) 2024 Hedera Hashgraph, LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.swirlds.virtualmap.internal.reconnect; - -import static com.swirlds.virtualmap.internal.Path.ROOT_PATH; - -import com.swirlds.common.merkle.synchronization.task.ReconnectNodeCount; -import com.swirlds.virtualmap.internal.Path; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - -public class TwoPhaseParentsTraversalOrder implements NodeTraversalOrder { - - private final VirtualLearnerTreeView view; - - private ReconnectNodeCount nodeCount; - - private long reconnectFirstLeafPath; - private long reconnectLastLeafPath; - - // This set is populated during phase 1 (if any) on the receiving thread. After phase 1 is - // complete, the set is used on the sending thread. No concurrent reads or writes, not a - // concurrent set - private final Set cleanNodes = ConcurrentHashMap.newKeySet(); - - // Number of parent node chunks processed in parallel in phase 1 - private int chunkCount; - - private int chunksTopRank; - - private Map chunkStartPaths; - private Map chunkWidths; - private Map chunkStartRanks; - - private int lastSentPathChunk; - private Map chunkNextPaths; - - // Used during phase 2 - private long lastLeafPath = Path.INVALID_PATH; - - public TwoPhaseParentsTraversalOrder(final VirtualLearnerTreeView view) { - this.view = view; - } - - @Override - public void start(final long firstLeafPath, final long lastLeafPath, final ReconnectNodeCount nodeCount) { - this.reconnectFirstLeafPath = firstLeafPath; - this.reconnectLastLeafPath = lastLeafPath; - this.nodeCount = nodeCount; - - final int leafParentRank = Path.getRank(firstLeafPath) - 1; - if (leafParentRank < 5) { - chunkCount = 0; - return; // no phase 1, just iterate over all leaves - } - - chunksTopRank = leafParentRank / 2; - chunkCount = 1 << chunksTopRank; - final int minChunkHeight = leafParentRank - chunksTopRank; - - final long firstPathInLeafParentRank = Path.getLeftGrandChildPath(0, leafParentRank); - - chunkStartPaths = new ConcurrentHashMap<>(chunkCount); - chunkWidths = new ConcurrentHashMap<>(chunkCount); - chunkStartRanks = new ConcurrentHashMap<>(chunkCount); - lastSentPathChunk = -1; - chunkNextPaths = new ConcurrentHashMap<>(chunkCount); - for (int i = 0; i < chunkCount; i++) { - final long p = firstPathInLeafParentRank + ((long) i << minChunkHeight); - if (Path.getLeftChildPath(p) + (2L << minChunkHeight) <= reconnectFirstLeafPath) { - chunkStartPaths.put(i, Path.getLeftChildPath(p)); - chunkWidths.put(i, 2L << minChunkHeight); - chunkStartRanks.put(i, leafParentRank + 1); - } else { - chunkStartPaths.put(i, p); - chunkWidths.put(i, 1L << minChunkHeight); - chunkStartRanks.put(i, leafParentRank); - } - chunkNextPaths.put(i, chunkStartPaths.get(i)); - } - } - - @Override - public void nodeReceived(final long path, final boolean isClean) { - final boolean isLeaf = path >= reconnectFirstLeafPath; - if (isLeaf) { - nodeCount.incrementLeafCount(); - if (isClean) { - nodeCount.incrementRedundantLeafCount(); - } - } else { - final int rank = Path.getRank(path); - if (path != 0) { - assert chunkCount > 0; - final int chunk = getPathChunk(path); - final int chunkStartRank = chunkStartRanks.get(chunk); - final long chunkNextPath = chunkNextPaths.get(chunk); - if (isClean) { - cleanNodes.add(path); - cleanNodes.remove(Path.getLeftChildPath(path)); - cleanNodes.remove(Path.getRightChildPath(path)); - if ((chunkNextPath > 0) && Path.isInSubTree(path, chunkNextPath)) { - final long lastCleanPath = Path.getRightGrandChildPath(path, chunkStartRank - rank); - chunkNextPaths.put(chunk, getNextPathInChunk(chunk, lastCleanPath)); - } - } else { - if ((chunkNextPath != -1) && Path.isInSubTree(chunkNextPath, path)) { - final long originAtChunkStartRank = Path.getLeftGrandChildPath(path, chunkStartRank - rank); - chunkNextPaths.put(chunk, skipCleanPaths(originAtChunkStartRank + 1, reconnectFirstLeafPath)); - } - } - } - nodeCount.incrementInternalCount(); - if (isClean) { - nodeCount.incrementRedundantInternalCount(); - } - } - } - - private Set sent = new HashSet<>(); - - @Override - public long getNextPathToSend() throws InterruptedException { - long result = -1; - if (lastLeafPath == -1) { - for (int i = 0; i < chunkCount; i++) { - final int chunk = (lastSentPathChunk + 1 + i) % chunkCount; - result = chunkNextPaths.get(chunk); - if (result == -1) { - continue; - } - // Skip the path, if it's clean, or it's right and its left sibling is clean (but parent is not) - if (hasCleanParent(result) || ((Path.isRight(result) && cleanNodes.contains(Path.getSiblingPath(result))))) { - final int rank = Path.getRank(result); - final int chunkStartRank = chunkStartRanks.get(chunk); - final long originAtChunkStartRank = Path.getLeftGrandChildPath(result, chunkStartRank - rank); - result = cleanOrNext(chunk, originAtChunkStartRank + 1); - } - if (result == -1) { - chunkNextPaths.put(chunk, result); - continue; - } - final long next = getNextPathInChunk(chunk, result); - chunkNextPaths.put(chunk, next); - lastSentPathChunk = chunk; - break; - } - } - if (result == -1) { - result = getNextLeafPath(); - } else { - view.applySendBackpressure(); - } - if (sent.contains(result)) { - System.err.println("Already sent: " + result); - } - sent.add(result); - return result; - } - - private long cleanOrNext(final int chunk, final long path) { - if (getPathChunk(path) != chunk) { - return -1; - } - return hasCleanParent(path) ? getNextPathInChunk(chunk, path) : path; - } - - private long getNextPathInChunk(final int chunk, final long lastPath) { - final int lastPathRank = Path.getRank(lastPath); - final int chunkStartRank = chunkStartRanks.get(chunk); - final int chunkHeight = chunkStartRank - chunksTopRank; - if (Path.isLeft(lastPath) && (chunkStartRank - lastPathRank < chunkHeight) && !hasCleanParent(lastPath)) { - return Path.getParentPath(lastPath); - } - // next path at chunk start rank - long path = Path.getLeftGrandChildPath(lastPath, chunkStartRank - lastPathRank) + 1; - final long chunkStartPath = chunkStartPaths.get(chunk); - final long chunkWidth = chunkWidths.get(chunk); - final long lastPathInChunk = chunkStartPath + chunkWidth - 1; - return skipCleanPaths(path, lastPathInChunk); - } - - private long getNextLeafPath() { - if (lastLeafPath == Path.INVALID_PATH) { -// System.err.println("Clean nodes: " + cleanNodes.size()); -// System.err.println("First leaf sent: " + System.currentTimeMillis()); - } - long path = lastLeafPath == Path.INVALID_PATH ? reconnectFirstLeafPath : lastLeafPath + 1; - if ((path > reconnectLastLeafPath) || (reconnectFirstLeafPath < 0)) { - return Path.INVALID_PATH; - } - long result = skipCleanPaths(path, reconnectLastLeafPath); - if ((lastLeafPath != -1) && (result != -1) && (result != lastLeafPath + 1)) { -// System.err.println("Skipped: " + (result - lastLeafPath - 1) + " = " + (lastLeafPath + 1) + " -> " + result); - } - assert (result == Path.INVALID_PATH) || (result >= reconnectFirstLeafPath); - return lastLeafPath = result; - } - - private int getPathChunk(long path) { - int rank = Path.getRank(path); - assert rank >= chunksTopRank; - final long pathAtTopRank = Path.getGrandParentPath(path, rank - chunksTopRank); - return (int) (pathAtTopRank - Path.getLeftGrandChildPath(0, chunksTopRank)); - } - - private boolean hasCleanParent(final long path) { - long parent = Path.getParentPath(path); - boolean clean = false; - while ((parent > 0) && !clean) { - clean = cleanNodes.contains(parent); - parent = Path.getParentPath(parent); - } - return clean; - } - - private long skipCleanPaths(long path, final long limit) { - long result = skipCleanPaths(path); - while ((result < limit) && (result != path)) { - path = result; - result = skipCleanPaths(path); - } - return (result <= limit) ? result : Path.INVALID_PATH; - } - - private long skipCleanPaths(final long path) { - assert path > 0; - long parent = Path.getParentPath(path); - long cleanParent = Path.INVALID_PATH; - int parentRanksAbove = 1; - int cleanParentRanksAbove = 1; - while (parent != ROOT_PATH) { - if (cleanNodes.contains(parent)) { - cleanParent = parent; - cleanParentRanksAbove = parentRanksAbove; - } - parentRanksAbove++; - parent = Path.getParentPath(parent); - } - final long result; - if (cleanParent == Path.INVALID_PATH) { - // no clean parent found - result = path; - } else { - result = Path.getRightGrandChildPath(cleanParent, cleanParentRanksAbove) + 1; - } - assert result >= path; - return result; - } -} diff --git a/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/TwoPhasePessimisticTraversalOrder.java b/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/TwoPhasePessimisticTraversalOrder.java index 8504a67bdb97..7a4daeae2cdb 100644 --- a/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/TwoPhasePessimisticTraversalOrder.java +++ b/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/TwoPhasePessimisticTraversalOrder.java @@ -21,17 +21,42 @@ import com.swirlds.common.merkle.synchronization.task.ReconnectNodeCount; import com.swirlds.virtualmap.internal.Path; import java.util.Deque; -import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.atomic.AtomicReferenceArray; +/** + * Virtual node traversal policy to traverse the virtual tree bottom-up. It contains two phases, + * the first includes internal nodes, the second is for leaves. + * + *

The first phase. At the leaf parent rank (the rank of the first leaf path, minus 1) all + * internal nodes are split into chunks. Every chunk has a start rank (leaf parent rank, or + * first leaf rank for the part of the tree on top of the last rank leaves) and width (number + * of internal nodes in the chunk at its rank). For every chunk, its starting path is sent first, + * this is the first path at chunk's rank. + * + *

When a response for an internal node is received from the teacher: if the node is clean, + * and it's the left child, a request for its parent node is sent. If the parent is clean and + * left, a request is sent for the grand parent, and so on. If the node is dirty, the next + * request for this chunk is sent at its starting rank again, skipping all nodes in clear + * sub-trees. + * + *

Since responses are received independently of requests, it may happen that a request + * for every chunk are sent, but no responses are there yet. In this case, this class makes a + * pessimistic assumption that the previously sent path is dirty, and proceeds to the next + * path at chunk's starting rank. It may result in redundant nodes sent (a clean node is sent + * before a response for one of its clean parents is received), but it increases overall + * throughput. + * + *

The second phase starts when requests for all internal nodes in all chunks have been + * sent (but responses may not have been recorded). At this phase, all leaves are traversed + * from the first leaf path to the last leaf path, skipping paths with clean parents. At + * this step some clean leaves may be sent redundantly, too. + */ public class TwoPhasePessimisticTraversalOrder implements NodeTraversalOrder { - private final VirtualLearnerTreeView view; - private ReconnectNodeCount nodeCount; private long reconnectFirstLeafPath; @@ -42,22 +67,38 @@ public class TwoPhasePessimisticTraversalOrder implements NodeTraversalOrder { // Number of parent node chunks processed in parallel in phase 1 private int chunkCount; - private int chunksTopRank; + // The rank of top-most nodes in every chunk. For example, if chunks are started at + // rank 20, and there are 512 chunks, it means chunk height is 11, and chunk stop + // rank is 9. Top-most paths of all chunks are at rank 9 + private int chunksStopRank; + + // Start ranks for every chunk. A chunk may start at first leaf rank or first leaf rank - 1 + private AtomicReferenceArray chunkStartRanks; + + // Start path for every chunk. This path is on the chunk's start rank + private AtomicReferenceArray chunkStartPaths; - private Map chunkStartRanks; // first leaf rank or first leaf rank - 1 - private Map chunkStartPaths; - private Map chunkWidths; + // Chunk width, i.e. number of paths in the chunk, at the chunk's start rank + private AtomicReferenceArray chunkWidths; + // Index of the chunk, in which the last internal path at phase 1 was sent private int lastSentPathChunk; + + // For every chunk, a list of paths to check next. Initially this is a starting + // path for every chunk. Then this list may contain clean nodes' parents. If for a chunk + // this list is not empty, paths from the list are sent before "pessimistic" paths + // for this chunk private AtomicReferenceArray> chunkNextToCheckPaths; + + // For every chunk, the next "pessimistic" internal node in the chunk to check. These + // nodes are at chunk starting ranks. If such a path appears to be clean, it's skipped + // up to the next dirty path or the end of the chunk private Map chunkNextPessimisticPaths; // Used during phase 2 private long lastLeafPath = Path.INVALID_PATH; - public TwoPhasePessimisticTraversalOrder(final VirtualLearnerTreeView view) { - this.view = view; - } + public TwoPhasePessimisticTraversalOrder() {} @Override public void start(final long firstLeafPath, final long lastLeafPath, final ReconnectNodeCount nodeCount) { @@ -71,16 +112,18 @@ public void start(final long firstLeafPath, final long lastLeafPath, final Recon return; // no phase 1, just iterate over all leaves } - chunksTopRank = leafParentRank / 2; - chunkCount = 1 << chunksTopRank; - final int minChunkHeight = leafParentRank - chunksTopRank; + // Higher the stop rank, less number of chunks. Half of the tree height seems to work well + chunksStopRank = leafParentRank / 2; + chunkCount = 1 << chunksStopRank; + // Height of chunks starting from leaf parent rank. Chunks starting from first leaf rank + // will be of minChunkHeight + 1 height + final int minChunkHeight = leafParentRank - chunksStopRank; final long firstPathInLeafParentRank = Path.getLeftGrandChildPath(0, leafParentRank); - chunkStartPaths = new ConcurrentHashMap<>(chunkCount); - chunkWidths = new ConcurrentHashMap<>(chunkCount); - chunkStartRanks = new ConcurrentHashMap<>(chunkCount); - lastSentPathChunk = -1; + chunkStartPaths = new AtomicReferenceArray<>(chunkCount); + chunkWidths = new AtomicReferenceArray<>(chunkCount); + chunkStartRanks = new AtomicReferenceArray<>(chunkCount); chunkNextToCheckPaths = new AtomicReferenceArray<>(chunkCount); chunkNextPessimisticPaths = new ConcurrentHashMap<>(chunkCount); for (int i = 0; i < chunkCount; i++) { @@ -97,12 +140,14 @@ public void start(final long firstLeafPath, final long lastLeafPath, final Recon startPath = p; chunkWidth = 1L << minChunkHeight; } - chunkStartPaths.put(i, startPath); - chunkStartRanks.put(i, startRank); - chunkWidths.put(i, chunkWidth); + chunkStartPaths.set(i, startPath); + chunkStartRanks.set(i, startRank); + chunkWidths.set(i, chunkWidth); chunkNextToCheckPaths.set(i, new ConcurrentLinkedDeque<>()); chunkNextPessimisticPaths.put(i, chunkStartPaths.get(i)); } + + lastSentPathChunk = -1; } @Override @@ -119,12 +164,21 @@ public void nodeReceived(final long path, final boolean isClean) { final int chunk = getPathChunk(path); if (isClean) { cleanNodes.add(path); + // Keep cleanNodes lean. If a parent is clean, its children are clean, too, no + // need to keep them in the set cleanNodes.remove(Path.getLeftChildPath(path)); cleanNodes.remove(Path.getRightChildPath(path)); + // If clean and left, add the parent to the list of paths to check. Even if + // the parent is higher in the tree than chunkStopRank if ((path != 1) && Path.isLeft(path)) { chunkNextToCheckPaths.get(chunk).addFirst(Path.getParentPath(path)); } } else { + // At the chunk start rank, every other path (i.e. all right paths) is skipped by + // default. If a left sibling is clean, there is no need to check the right sibling, + // as a request for the parent will be sent anyway. However, if the left sibling + // is dirty, the right sibling may be either dirty, or clean, so a request for it + // should be sent final int chunkStartRank = chunkStartRanks.get(chunk); final int pathRank = Path.getRank(path); if ((pathRank == chunkStartRank) && Path.isLeft(path)) { @@ -139,14 +193,13 @@ public void nodeReceived(final long path, final boolean isClean) { } } - private final Set sent = new HashSet<>(); - @Override - public long getNextPathToSend() throws InterruptedException { + public long getNextPathToSend() { long result = -1; if (lastLeafPath == -1) { for (int i = 0; i < chunkCount; i++) { final int chunk = (lastSentPathChunk + 1 + i) % chunkCount; + // Check the queue first. If not empty, return a path from there (if not clean) final Deque toCheck = chunkNextToCheckPaths.get(chunk); result = toCheck.isEmpty() ? -1 : toCheck.pollFirst(); while ((result != -1) && hasCleanParent(result)) { @@ -156,6 +209,7 @@ public long getNextPathToSend() throws InterruptedException { lastSentPathChunk = chunk; break; } + // Otherwise proceed to the next pessimistic path at chunk start rank result = cleanOrNext(chunk, chunkNextPessimisticPaths.get(chunk)); if (result == -1) { chunkNextPessimisticPaths.put(chunk, -1L); @@ -172,13 +226,7 @@ public long getNextPathToSend() throws InterruptedException { } if (result == -1) { result = getNextLeafPath(); - } else { - view.applySendBackpressure(); - } - if (sent.contains(result)) { - System.err.println("Already sent: " + result); } - sent.add(result); return result; } @@ -192,7 +240,7 @@ private long cleanOrNext(final int chunk, final long path) { private long getNextPathInChunk(final int chunk, final long lastPath) { final int lastPathRank = Path.getRank(lastPath); final int chunkStartRank = chunkStartRanks.get(chunk); - final int chunkHeight = chunkStartRank - chunksTopRank; + final int chunkHeight = chunkStartRank - chunksStopRank; if (Path.isLeft(lastPath) && (chunkStartRank - lastPathRank < chunkHeight) && !hasCleanParent(lastPath)) { return Path.getParentPath(lastPath); } @@ -205,31 +253,24 @@ private long getNextPathInChunk(final int chunk, final long lastPath) { } private long getNextLeafPath() { - if (lastLeafPath == Path.INVALID_PATH) { -// System.err.println("Clean nodes: " + cleanNodes.size()); -// System.err.println("First leaf sent: " + System.currentTimeMillis()); - } long path = lastLeafPath == Path.INVALID_PATH ? reconnectFirstLeafPath : lastLeafPath + 1; if ((path > reconnectLastLeafPath) || (reconnectFirstLeafPath < 0)) { return Path.INVALID_PATH; } long result = skipCleanPaths(path, reconnectLastLeafPath); - if ((lastLeafPath != -1) && (result != -1) && (result != lastLeafPath + 1)) { -// System.err.println("Skipped: " + (result - lastLeafPath - 1) + " = " + (lastLeafPath + 1) + " -> " + result); - } assert (result == Path.INVALID_PATH) || (result >= reconnectFirstLeafPath); return lastLeafPath = result; } private int getPathChunk(long path) { int rank = Path.getRank(path); - if (rank < chunksTopRank) { + if (rank < chunksStopRank) { // This may happen if the whole chunk is clean - path = Path.getLeftGrandChildPath(path, chunksTopRank - rank); - rank = chunksTopRank; + path = Path.getLeftGrandChildPath(path, chunksStopRank - rank); + rank = chunksStopRank; } - final long pathAtTopRank = Path.getGrandParentPath(path, rank - chunksTopRank); - return (int) (pathAtTopRank - Path.getLeftGrandChildPath(0, chunksTopRank)); + final long pathAtTopRank = Path.getGrandParentPath(path, rank - chunksStopRank); + return (int) (pathAtTopRank - Path.getLeftGrandChildPath(0, chunksStopRank)); } private long getLastChunkPath(final int chunk) { diff --git a/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/VirtualLearnerTreeView.java b/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/VirtualLearnerTreeView.java index 6b7876bc6be1..0ddf9a724fe7 100644 --- a/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/VirtualLearnerTreeView.java +++ b/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/VirtualLearnerTreeView.java @@ -22,12 +22,6 @@ public interface VirtualLearnerTreeView extends LearnerTreeView { - void setNodeTraveralOrder(final NodeTraversalOrder traversalOrder); - // Reads the node from the teacher void readNode(final SerializableDataInputStream in, final long path, final boolean isClean) throws IOException; - - void anticipateMesssage(); - - void applySendBackpressure() throws InterruptedException; } diff --git a/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/VirtualReconnectUtils.java b/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/VirtualReconnectUtils.java index 3a048e517e06..c099b3494866 100644 --- a/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/VirtualReconnectUtils.java +++ b/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/VirtualReconnectUtils.java @@ -1,3 +1,19 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.swirlds.virtualmap.internal.reconnect; import java.io.IOException; diff --git a/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/VirtualTeacherTreeView.java b/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/VirtualTeacherTreeView.java index 3d94b34d1161..e505419da7db 100644 --- a/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/VirtualTeacherTreeView.java +++ b/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/VirtualTeacherTreeView.java @@ -27,13 +27,6 @@ public interface VirtualTeacherTreeView extends TeacherTreeView { // state metadata (leaf paths) void writeNode(final SerializableDataOutputStream out, final long path, final boolean isClean) throws IOException; + // Returns virtual node hash for the specified path Hash loadHash(final long path); - - // Only used in async teaching pull model, when teacher sends responses in a different thread - // than receives requests - void registerRequest(final PullVirtualTreeRequest request); - - boolean hasPendingResponses(); - - PullVirtualTreeResponse getNextResponse() throws InterruptedException; }