diff --git a/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/merkle/synchronization/LearningSynchronizer.java b/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/merkle/synchronization/LearningSynchronizer.java index 266a87ff41da..8a80829dc2e4 100644 --- a/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/merkle/synchronization/LearningSynchronizer.java +++ b/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/merkle/synchronization/LearningSynchronizer.java @@ -220,8 +220,13 @@ private List receiveTreesInParallel(final List toReceive new StandardWorkGroup(threadManager, WORK_GROUP_NAME, breakConnection, reconnectExceptionListener); final List> reconstructedRoots = new ArrayList<>(toReceive.size()); + final AsyncInputStream in = new AsyncInputStream(inputStream, workGroup, reconnectConfig); final AsyncOutputStream out = buildOutputStream(workGroup, outputStream); + // Async output can be started right away. Its internal queues for every view are initialized + // in sendAsync(). Async input is different, views are explicitly registered in it using + // registerView() method. This is why it is started below, after all tasks are created + out.start(); final AtomicInteger runningCount = new AtomicInteger(toReceive.size()); for (final MerkleNode root : toReceive) { @@ -253,8 +258,9 @@ private List receiveTreesInParallel(final List toReceive }); } + // All views have registered themselves in async input. It can now accept messages from the + // underlying input stream and put them to the right view's queues. It's time to start it in.start(); - out.start(); InterruptedException interruptException = null; try { diff --git a/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/merkle/synchronization/TeachingSynchronizer.java b/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/merkle/synchronization/TeachingSynchronizer.java index 1690722789a6..63f22d8b2e42 100644 --- a/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/merkle/synchronization/TeachingSynchronizer.java +++ b/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/merkle/synchronization/TeachingSynchronizer.java @@ -180,6 +180,10 @@ private void sendTreesInParallel(final List toSend) throws Inter final AsyncInputStream in = new AsyncInputStream(inputStream, workGroup, reconnectConfig); final AsyncOutputStream out = buildOutputStream(workGroup, outputStream); + // Async output can be started right away. Its internal queues for every view are initialized + // in sendAsync(). Async input is different, views are explicitly registered in it using + // registerView() method. This is why it is started below, after all tasks are created + out.start(); final AtomicInteger running = new AtomicInteger(toSend.size()); for (final TeacherSubtree subtree : toSend) { @@ -202,8 +206,9 @@ private void sendTreesInParallel(final List toSend) throws Inter }); } + // All views have registered themselves in async input. It can now accept messages from the + // underlying input stream and put them to the right view's queues. It's time to start it in.start(); - out.start(); workGroup.waitForTermination(); diff --git a/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/merkle/synchronization/streams/AsyncOutputStream.java b/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/merkle/synchronization/streams/AsyncOutputStream.java index 14b6beb031b1..1bf6e03f61bb 100644 --- a/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/merkle/synchronization/streams/AsyncOutputStream.java +++ b/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/merkle/synchronization/streams/AsyncOutputStream.java @@ -33,6 +33,7 @@ import java.util.Queue; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import org.apache.logging.log4j.LogManager; @@ -120,12 +121,13 @@ public AsyncOutputStream( this.workGroup = Objects.requireNonNull(workGroup, "workGroup must not be null"); this.queueSize = config.asyncStreamBufferSize(); this.streamQueue = new LinkedBlockingQueue<>(queueSize * config.maxParallelSubtrees()); - this.viewMessages = new HashMap<>(); this.alive = true; this.timeSinceLastFlush = new StopWatch(); this.timeSinceLastFlush.start(); this.flushInterval = config.asyncOutputStreamFlush(); this.timeout = config.asyncStreamTimeout(); + + this.viewMessages = new ConcurrentHashMap<>(); } /** diff --git a/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/TeacherPullVirtualTreeReceiveTask.java b/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/TeacherPullVirtualTreeReceiveTask.java index 7e96d44122b3..c7726100e68d 100644 --- a/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/TeacherPullVirtualTreeReceiveTask.java +++ b/platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/TeacherPullVirtualTreeReceiveTask.java @@ -120,7 +120,7 @@ private void rateLimit() throws InterruptedException { private void run() { boolean success = false; try { - in.anticipateMessage(); // anticipate root node response + in.anticipateMessage(); // anticipate root node request while (true) { rateLimit(); final PullVirtualTreeRequest request = in.readAnticipatedMessage(viewId);