Skip to content

Commit

Permalink
Fixed queue initialization in AsyncOutputStream
Browse files Browse the repository at this point in the history
Signed-off-by: Artem Ananev <[email protected]>
  • Loading branch information
artemananiev committed Apr 16, 2024
1 parent 16330b4 commit 9c2cb9b
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,13 @@ private List<MerkleNode> receiveTreesInParallel(final List<MerkleNode> toReceive
new StandardWorkGroup(threadManager, WORK_GROUP_NAME, breakConnection, reconnectExceptionListener);

final List<AtomicReference<MerkleNode>> reconstructedRoots = new ArrayList<>(toReceive.size());

final AsyncInputStream in = new AsyncInputStream(inputStream, workGroup, reconnectConfig);
final AsyncOutputStream out = buildOutputStream(workGroup, outputStream);
// Async output can be started right away. Its internal queues for every view are initialized
// in sendAsync(). Async input is different, views are explicitly registered in it using
// registerView() method. This is why it is started below, after all tasks are created
out.start();

final AtomicInteger runningCount = new AtomicInteger(toReceive.size());
for (final MerkleNode root : toReceive) {
Expand Down Expand Up @@ -253,8 +258,9 @@ private List<MerkleNode> receiveTreesInParallel(final List<MerkleNode> toReceive
});
}

// All views have registered themselves in async input. It can now accept messages from the
// underlying input stream and put them to the right view's queues. It's time to start it
in.start();
out.start();

InterruptedException interruptException = null;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ private void sendTreesInParallel(final List<TeacherSubtree> toSend) throws Inter

final AsyncInputStream in = new AsyncInputStream(inputStream, workGroup, reconnectConfig);
final AsyncOutputStream out = buildOutputStream(workGroup, outputStream);
// Async output can be started right away. Its internal queues for every view are initialized
// in sendAsync(). Async input is different, views are explicitly registered in it using
// registerView() method. This is why it is started below, after all tasks are created
out.start();

final AtomicInteger running = new AtomicInteger(toSend.size());
for (final TeacherSubtree subtree : toSend) {
Expand All @@ -202,8 +206,9 @@ private void sendTreesInParallel(final List<TeacherSubtree> toSend) throws Inter
});
}

// All views have registered themselves in async input. It can now accept messages from the
// underlying input stream and put them to the right view's queues. It's time to start it
in.start();
out.start();

workGroup.waitForTermination();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -120,12 +121,13 @@ public AsyncOutputStream(
this.workGroup = Objects.requireNonNull(workGroup, "workGroup must not be null");
this.queueSize = config.asyncStreamBufferSize();
this.streamQueue = new LinkedBlockingQueue<>(queueSize * config.maxParallelSubtrees());
this.viewMessages = new HashMap<>();
this.alive = true;
this.timeSinceLastFlush = new StopWatch();
this.timeSinceLastFlush.start();
this.flushInterval = config.asyncOutputStreamFlush();
this.timeout = config.asyncStreamTimeout();

this.viewMessages = new ConcurrentHashMap<>();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ private void rateLimit() throws InterruptedException {
private void run() {
boolean success = false;
try {
in.anticipateMessage(); // anticipate root node response
in.anticipateMessage(); // anticipate root node request
while (true) {
rateLimit();
final PullVirtualTreeRequest request = in.readAnticipatedMessage(viewId);
Expand Down

0 comments on commit 9c2cb9b

Please sign in to comment.