Skip to content

Commit

Permalink
DoExchange snapshot won't half-close until after client does (deephav…
Browse files Browse the repository at this point in the history
…en#4653)

Additional mitigation for
envoyproxy/envoy#30149 - combined with a
client that will always half-close before the server does, this ensures
that the server will never need to half-close first in order to end a
stream. This is technically a breaking change - existing clients which
fail to half-close will instead hang with the connection left open until
timeout.

Fixes deephaven#4627
  • Loading branch information
niloc132 authored Oct 17, 2023
1 parent 4b792cf commit 186891c
Showing 1 changed file with 66 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,8 @@
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;

import static io.deephaven.extensions.barrage.util.BarrageUtil.DEFAULT_SNAPSHOT_DESER_OPTIONS;

Expand Down Expand Up @@ -270,6 +269,31 @@ public void close() {
}
}

/**
* Represents states for a DoExchange stream where the server must not close until the client has half closed.
*/
enum HalfClosedState {
/**
* Client has not half-closed, server should not half close until the client has done so.
*/
DONT_CLOSE,
/**
* Indicates that the client has half-closed, and the server should half close immediately after finishing
* sending data.
*/
CLIENT_HALF_CLOSED,

/**
* The server has no more data to send, but client hasn't half-closed.
*/
FINISHED_SENDING,

/**
* Streaming finished and client half-closed.
*/
CLOSED
}

/**
* Helper class that maintains a subscription whether it was created by a bi-directional stream request or the
* no-client-streaming request. If the SubscriptionRequest sets the sequence, then it treats sequence as a watermark
Expand Down Expand Up @@ -439,6 +463,8 @@ private void tryClose() {
*/
private class SnapshotRequestHandler
implements Handler {
private final AtomicReference<HalfClosedState> halfClosedState =
new AtomicReference<>(HalfClosedState.DONT_CLOSE);

public SnapshotRequestHandler() {}

Expand Down Expand Up @@ -493,14 +519,51 @@ public void handleMessage(@NotNull final BarrageProtoUtil.MessageInfo message) {
// leverage common code for `DoGet` and `BarrageSnapshotOptions`
BarrageUtil.createAndSendSnapshot(streamGeneratorFactory, table, columns, viewport,
reverseViewport, snapshotOptAdapter.adapt(snapshotRequest), listener, metrics);
listener.onCompleted();
HalfClosedState newState = halfClosedState.updateAndGet(current -> {
switch (current) {
case DONT_CLOSE:
// record that we have finished sending
return HalfClosedState.FINISHED_SENDING;
case CLIENT_HALF_CLOSED:
// since streaming has now finished, and client already half-closed, time to
// half close from server
return HalfClosedState.CLOSED;
case FINISHED_SENDING:
case CLOSED:
throw new IllegalStateException("Can't finish streaming twice");
default:
throw new IllegalStateException("Unknown state " + current);
}
});
if (newState == HalfClosedState.CLOSED) {
listener.onCompleted();
}
});
}
}

@Override
public void close() {
// no work to do for DoGetRequest close
// possibly safely complete if finished sending data
HalfClosedState newState = halfClosedState.updateAndGet(current -> {
switch (current) {
case DONT_CLOSE:
// record that we have half closed
return HalfClosedState.CLIENT_HALF_CLOSED;
case FINISHED_SENDING:
// since client has now half closed, and we're done sending, time to half-close from server
return HalfClosedState.CLOSED;
case CLIENT_HALF_CLOSED:
case CLOSED:
throw new IllegalStateException("Can't close twice");
default:
throw new IllegalStateException("Unknown state " + current);
}
});
if (newState == HalfClosedState.CLOSED) {
listener.onCompleted();
}
}
}

Expand Down

0 comments on commit 186891c

Please sign in to comment.