From 5afbd9d4d6ad763443c8e5347c974709767c7b85 Mon Sep 17 00:00:00 2001 From: OlegDokuka Date: Thu, 1 Dec 2022 14:04:34 +0200 Subject: [PATCH] fixes with UnboundedProcessor and TCP / WS connections design Signed-off-by: Oleh Dokuka Signed-off-by: Oleh Dokuka Signed-off-by: OlegDokuka --- .../io/rsocket/core/RSocketRequester.java | 2 +- .../core/RequesterResponderSupport.java | 3 +- .../internal/BaseDuplexConnection.java | 3 +- .../rsocket/internal/UnboundedProcessor.java | 3 +- .../rsocket/resume/ClientRSocketSession.java | 17 +-- .../resume/ResumableDuplexConnection.java | 112 ++++++++++-------- .../ClientStreamingToServer.java | 41 ++++--- .../java/io/rsocket/test/TransportTest.java | 9 +- .../transport/netty/TcpDuplexConnection.java | 51 +------- .../netty/WebsocketDuplexConnection.java | 50 +------- 10 files changed, 109 insertions(+), 182 deletions(-) diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java index a2fbaf937..01e7e20b5 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java @@ -388,7 +388,7 @@ private void terminate(Throwable e) { requesterLeaseTracker.dispose(e); } - final Collection activeStreamsCopy; + final Collection activeStreamsCopy; // in case of graceful shut down is empty synchronized (this) { final IntObjectMap activeStreams = this.activeStreams; activeStreamsCopy = new ArrayList<>(activeStreams.values()); diff --git a/rsocket-core/src/main/java/io/rsocket/core/RequesterResponderSupport.java b/rsocket-core/src/main/java/io/rsocket/core/RequesterResponderSupport.java index fc45ae694..daee8dcb6 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RequesterResponderSupport.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RequesterResponderSupport.java @@ -135,7 +135,8 @@ public int addAndGetNextStreamId(FrameHandler frameHandler) { public synchronized boolean add(int streamId, FrameHandler frameHandler) { if (this.terminating) { - throw new CanceledException("Disposed"); + throw new CanceledException( + "This RSocket is either disposed or disposing, and no longer accepting new requests"); } final IntObjectMap activeStreams = this.activeStreams; diff --git a/rsocket-core/src/main/java/io/rsocket/internal/BaseDuplexConnection.java b/rsocket-core/src/main/java/io/rsocket/internal/BaseDuplexConnection.java index d9abc9219..23ec6baf1 100644 --- a/rsocket-core/src/main/java/io/rsocket/internal/BaseDuplexConnection.java +++ b/rsocket-core/src/main/java/io/rsocket/internal/BaseDuplexConnection.java @@ -27,8 +27,9 @@ public abstract class BaseDuplexConnection implements DuplexConnection { protected final UnboundedProcessor sender = new UnboundedProcessor( () -> { + System.out.println("queue is done"); onClose.tryEmitEmpty(); - dispose(); + // dispose(); }, (__) -> {}); diff --git a/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java b/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java index 9ed88a206..9d47597c1 100644 --- a/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java +++ b/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java @@ -203,8 +203,8 @@ public boolean tryEmitFinal(ByteBuf t) { return false; } - this.done = true; this.last = t; + this.done = true; final long previousState = markValueAddedAndTerminated(this); if (isFinalized(previousState)) { @@ -216,6 +216,7 @@ public boolean tryEmitFinal(ByteBuf t) { if (this.outputFused) { // fast path for fusion this.actual.onNext(null); + this.actual.onComplete(); return true; } diff --git a/rsocket-core/src/main/java/io/rsocket/resume/ClientRSocketSession.java b/rsocket-core/src/main/java/io/rsocket/resume/ClientRSocketSession.java index 2f2f29001..35ea79177 100644 --- a/rsocket-core/src/main/java/io/rsocket/resume/ClientRSocketSession.java +++ b/rsocket-core/src/main/java/io/rsocket/resume/ClientRSocketSession.java @@ -29,7 +29,6 @@ import io.rsocket.frame.ResumeOkFrameCodec; import io.rsocket.keepalive.KeepAliveSupport; import java.time.Duration; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.Function; import org.reactivestreams.Subscription; @@ -189,7 +188,7 @@ void tryReestablishSession(Tuple2 tuple2) { } final ConnectionErrorException connectionErrorException = new ConnectionErrorException("RESUME_OK frame must be received before any others"); - resumableConnection.dispose(connectionErrorException); + resumableConnection.dispose(nextDuplexConnection, connectionErrorException); nextDuplexConnection.sendErrorAndClose(connectionErrorException); nextDuplexConnection.receive().subscribe().dispose(); @@ -227,7 +226,7 @@ void tryReestablishSession(Tuple2 tuple2) { } final ConnectionErrorException t = new ConnectionErrorException(e.getMessage(), e); - resumableConnection.dispose(t); + resumableConnection.dispose(nextDuplexConnection, t); nextDuplexConnection.sendErrorAndClose(t); nextDuplexConnection.receive().subscribe().dispose(); @@ -278,7 +277,7 @@ void tryReestablishSession(Tuple2 tuple2) { final ConnectionErrorException connectionErrorException = new ConnectionErrorException("resumption_server_pos=[" + remoteImpliedPos + "]"); - resumableConnection.dispose(connectionErrorException); + resumableConnection.dispose(nextDuplexConnection, connectionErrorException); nextDuplexConnection.sendErrorAndClose(connectionErrorException); nextDuplexConnection.receive().subscribe().dispose(); @@ -292,7 +291,7 @@ void tryReestablishSession(Tuple2 tuple2) { exception); } if (exception instanceof RejectedResumeException) { - resumableConnection.dispose(exception); + resumableConnection.dispose(nextDuplexConnection, exception); nextDuplexConnection.dispose(); nextDuplexConnection.receive().subscribe().dispose(); return; @@ -309,7 +308,7 @@ void tryReestablishSession(Tuple2 tuple2) { final ConnectionErrorException connectionErrorException = new ConnectionErrorException("RESUME_OK frame must be received before any others"); - resumableConnection.dispose(connectionErrorException); + resumableConnection.dispose(nextDuplexConnection, connectionErrorException); nextDuplexConnection.sendErrorAndClose(connectionErrorException); nextDuplexConnection.receive().subscribe().dispose(); @@ -349,11 +348,7 @@ public void onError(Throwable t) { Operators.onErrorDropped(t, currentContext()); } - if (t instanceof TimeoutException) { - resumableConnection.dispose(); - } else { - resumableConnection.dispose(t); - } + resumableConnection.dispose(); } @Override diff --git a/rsocket-core/src/main/java/io/rsocket/resume/ResumableDuplexConnection.java b/rsocket-core/src/main/java/io/rsocket/resume/ResumableDuplexConnection.java index f061857ff..8d893ddae 100644 --- a/rsocket-core/src/main/java/io/rsocket/resume/ResumableDuplexConnection.java +++ b/rsocket-core/src/main/java/io/rsocket/resume/ResumableDuplexConnection.java @@ -21,8 +21,8 @@ import io.netty.util.CharsetUtil; import io.rsocket.DuplexConnection; import io.rsocket.RSocketErrorException; -import io.rsocket.exceptions.ConnectionCloseException; import io.rsocket.exceptions.ConnectionErrorException; +import io.rsocket.frame.ErrorFrameCodec; import io.rsocket.frame.FrameHeaderCodec; import io.rsocket.internal.UnboundedProcessor; import java.net.SocketAddress; @@ -50,8 +50,8 @@ public class ResumableDuplexConnection extends Flux final ResumableFramesStore resumableFramesStore; final UnboundedProcessor savableFramesSender; - final Disposable framesSaverDisposable; - final Sinks.Empty onClose; + final Sinks.Empty onQueueClose; + final Sinks.Empty onLastConnectionClose; final SocketAddress remoteAddress; final Sinks.Many onConnectionClosedSink; @@ -79,11 +79,13 @@ public ResumableDuplexConnection( this.session = session.toString(CharsetUtil.UTF_8); this.onConnectionClosedSink = Sinks.unsafe().many().unicast().onBackpressureBuffer(); this.resumableFramesStore = resumableFramesStore; - this.savableFramesSender = new UnboundedProcessor(); - this.framesSaverDisposable = resumableFramesStore.saveFrames(savableFramesSender).subscribe(); - this.onClose = Sinks.empty(); + this.onQueueClose = Sinks.unsafe().empty(); + this.onLastConnectionClose = Sinks.unsafe().empty(); + this.savableFramesSender = new UnboundedProcessor(onQueueClose::tryEmitEmpty, __ -> {}); this.remoteAddress = initialConnection.remoteAddress(); + resumableFramesStore.saveFrames(savableFramesSender).subscribe(); + ACTIVE_CONNECTION.lazySet(this, initialConnection); } @@ -120,10 +122,12 @@ void initConnection(DuplexConnection nextConnection) { .resumeStream() .subscribe( f -> nextConnection.sendFrame(FrameHeaderCodec.streamId(f), f), - t -> sendErrorAndClose(new ConnectionErrorException(t.getMessage())), + t -> + nextConnection.sendErrorAndClose( + new ConnectionErrorException(t.getMessage(), t)), () -> - sendErrorAndClose( - new ConnectionCloseException("Connection Closed Unexpectedly"))); + nextConnection.sendErrorAndClose( + new ConnectionErrorException("Connection Closed Unexpectedly"))); nextConnection.receive().subscribe(frameReceivingSubscriber); nextConnection .onClose() @@ -161,9 +165,9 @@ public void disconnect() { @Override public void sendFrame(int streamId, ByteBuf frame) { if (streamId == 0) { - savableFramesSender.onNextPrioritized(frame); + savableFramesSender.tryEmitPrioritized(frame); } else { - savableFramesSender.onNext(frame); + savableFramesSender.tryEmitNormal(frame); } } @@ -184,32 +188,25 @@ public void sendErrorAndClose(RSocketErrorException rSocketErrorException) { return; } - activeConnection.sendErrorAndClose(rSocketErrorException); + savableFramesSender.tryEmitFinal( + ErrorFrameCodec.encode(activeConnection.alloc(), 0, rSocketErrorException)); + activeConnection .onClose() .subscribe( null, t -> { - framesSaverDisposable.dispose(); - activeReceivingSubscriber.dispose(); - savableFramesSender.onComplete(); - savableFramesSender.cancel(); onConnectionClosedSink.tryEmitComplete(); - - onClose.tryEmitError(t); + onLastConnectionClose.tryEmitEmpty(); }, () -> { - framesSaverDisposable.dispose(); - activeReceivingSubscriber.dispose(); - savableFramesSender.onComplete(); - savableFramesSender.cancel(); onConnectionClosedSink.tryEmitComplete(); final Throwable cause = rSocketErrorException.getCause(); if (cause == null) { - onClose.tryEmitEmpty(); + onLastConnectionClose.tryEmitEmpty(); } else { - onClose.tryEmitError(cause); + onLastConnectionClose.tryEmitError(cause); } }); } @@ -226,50 +223,62 @@ public ByteBufAllocator alloc() { @Override public Mono onClose() { - return onClose.asMono(); + return Mono.whenDelayError( + onQueueClose.asMono().log(side + "_queue"), + resumableFramesStore.onClose().log(side + "_frame_store"), + onLastConnectionClose.asMono().log(side + "_last_connection")); } @Override public void dispose() { - dispose(null); - } - - void dispose(@Nullable Throwable e) { + logger.info(side + "_disposing"); final DuplexConnection activeConnection = ACTIVE_CONNECTION.getAndSet(this, DisposedConnection.INSTANCE); if (activeConnection == DisposedConnection.INSTANCE) { return; } - - if (activeConnection != null) { - activeConnection.dispose(); - } - - if (logger.isDebugEnabled()) { - logger.debug( - "Side[{}]|Session[{}]|DuplexConnection[{}]. Disposing...", - side, - session, - connectionIndex); - } - - framesSaverDisposable.dispose(); - activeReceivingSubscriber.dispose(); + logger.info(side + "_disposing2"); savableFramesSender.onComplete(); - savableFramesSender.cancel(); - onConnectionClosedSink.tryEmitComplete(); + activeConnection + .onClose() + .subscribe( + null, + t -> { + onConnectionClosedSink.tryEmitComplete(); + onLastConnectionClose.tryEmitEmpty(); + }, + () -> { + onConnectionClosedSink.tryEmitComplete(); + onLastConnectionClose.tryEmitEmpty(); + }); + } - if (e != null) { - onClose.tryEmitError(e); - } else { - onClose.tryEmitEmpty(); + void dispose(DuplexConnection nextConnection, @Nullable Throwable e) { + final DuplexConnection activeConnection = + ACTIVE_CONNECTION.getAndSet(this, DisposedConnection.INSTANCE); + if (activeConnection == DisposedConnection.INSTANCE) { + return; } + savableFramesSender.onComplete(); + nextConnection + .onClose() + .subscribe( + null, + t -> { + onConnectionClosedSink.tryEmitComplete(); + onLastConnectionClose.tryEmitEmpty(); + }, + () -> { + onConnectionClosedSink.tryEmitComplete(); + onLastConnectionClose.tryEmitEmpty(); + }); } @Override @SuppressWarnings("ConstantConditions") public boolean isDisposed() { - return onClose.scan(Scannable.Attr.TERMINATED) || onClose.scan(Scannable.Attr.CANCELLED); + return onQueueClose.scan(Scannable.Attr.TERMINATED) + || onQueueClose.scan(Scannable.Attr.CANCELLED); } @Override @@ -280,6 +289,7 @@ public SocketAddress remoteAddress() { @Override public void request(long n) { if (state == 1 && STATE.compareAndSet(this, 1, 2)) { + // happens for the very first time with the initial connection initConnection(this.activeConnection); } } diff --git a/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/gracefulshutdown/ClientStreamingToServer.java b/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/gracefulshutdown/ClientStreamingToServer.java index 94ad219fe..28eb65adf 100644 --- a/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/gracefulshutdown/ClientStreamingToServer.java +++ b/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/gracefulshutdown/ClientStreamingToServer.java @@ -29,7 +29,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; -import reactor.core.publisher.Sinks; +import reactor.core.publisher.Mono; public final class ClientStreamingToServer { @@ -37,27 +37,31 @@ public final class ClientStreamingToServer { public static void main(String[] args) throws InterruptedException { RSocketServer.create( - SocketAcceptor.with( - new RSocket() { - final Sinks.Empty onGracefulShutdownSink = Sinks.unsafe().empty(); + (setup, sendingSocket) -> { + sendingSocket.disposeGracefully(); - @Override - public Flux requestStream(Payload payload) { - return Flux.interval(Duration.ofMillis(100)) - .takeUntilOther(onGracefulShutdownSink.asMono()) - .map(aLong -> DefaultPayload.create("Interval: " + aLong)); - } + return Mono.just( + new RSocket() { + @Override + public Flux requestStream(Payload payload) { + return Flux.interval(Duration.ofMillis(100)) + .map(aLong -> DefaultPayload.create("Interval: " + aLong)); + } - @Override - public void disposeGracefully() { - // can be intercepted there - // onGracefulShutdownSink.tryEmitEmpty(); - } - })) + @Override + public void disposeGracefully() {} + }); + }) .bindNow(TcpServerTransport.create("localhost", 7000)); RSocket socket = RSocketConnector.create() + .acceptor( + SocketAcceptor.with( + new RSocket() { + @Override + public void disposeGracefully() {} + })) .setupPayload(DefaultPayload.create("test", "test")) .connect(TcpClientTransport.create("localhost", 7000)) .block(); @@ -73,11 +77,14 @@ public void disposeGracefully() { logger.debug(msg); counter.incrementAndGet(); }) - .take(100) .subscribe(); logger.debug("dispose gracefully"); socket.disposeGracefully(); + // + // Mono.delay(Duration.ofSeconds(10)) + // .doFinally((__) -> socket.dispose()) + // .subscribe(); socket.onClose().block(); diff --git a/rsocket-test/src/main/java/io/rsocket/test/TransportTest.java b/rsocket-test/src/main/java/io/rsocket/test/TransportTest.java index bbe8175f8..8d33c71dd 100644 --- a/rsocket-test/src/main/java/io/rsocket/test/TransportTest.java +++ b/rsocket-test/src/main/java/io/rsocket/test/TransportTest.java @@ -85,7 +85,7 @@ static String read(String resourceName) { @BeforeEach default void setup() { - // Hooks.onOperatorDebug(); + Hooks.onOperatorDebug(); } @AfterEach @@ -643,10 +643,9 @@ public String expectedPayloadMetadata() { public void awaitClosed() { client .onClose() - .log("client") .onErrorResume(t -> Mono.empty()) .then(Mono.fromRunnable(server::dispose)) - .then(server.onClose().delaySubscription(Duration.ofSeconds(1)).log("server")) + .then(server.onClose().delaySubscription(Duration.ofSeconds(1))) .block(Duration.ofMinutes(1)); } @@ -701,8 +700,8 @@ public SocketAddress remoteAddress() { @Override public Mono onClose() { return Mono.whenDelayError( - duplexConnection.onClose().log(duplexConnection + " original " + tag), - bufReleaserOperator.onClose().log(duplexConnection + " buffer " + tag)); + duplexConnection.onClose(), + bufReleaserOperator.onClose()); } @Override diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/TcpDuplexConnection.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/TcpDuplexConnection.java index 3e9698b87..0445f5c02 100644 --- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/TcpDuplexConnection.java +++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/TcpDuplexConnection.java @@ -25,19 +25,13 @@ import io.rsocket.internal.BaseDuplexConnection; import java.net.SocketAddress; import java.util.Objects; -import java.util.logging.Level; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.publisher.SignalType; import reactor.netty.Connection; /** An implementation of {@link DuplexConnection} that connects via TCP. */ public final class TcpDuplexConnection extends BaseDuplexConnection { - static final Logger LOGGER = LoggerFactory.getLogger(TcpDuplexConnection.class); - private final Connection connection; /** @@ -48,18 +42,11 @@ public final class TcpDuplexConnection extends BaseDuplexConnection { public TcpDuplexConnection(Connection connection) { this.connection = Objects.requireNonNull(connection, "connection must not be null"); - // connection.channel().closeFuture().addListener(future -> sender.onComplete()); - connection .outbound() - .send( - sender.log( - "queue " + this, - Level.INFO, - SignalType.CANCEL, - SignalType.ON_ERROR, - SignalType.ON_COMPLETE)) + .send(sender.hide()) .then() + .doFinally(__ -> connection.dispose()) .subscribe(); } @@ -80,43 +67,13 @@ protected void doOnClose() { @Override public Mono onClose() { - return Mono.whenDelayError( - super.onClose() - .doOnEach( - s -> - LOGGER.info( - this - + " queue " - + s.getType() - + " " - + s.getContextView().getOrDefault("tag", ""))), - connection - .onDispose() - .doOnEach( - s -> - LOGGER.info( - this - + " connection " - + s.getType() - + " " - + s.getContextView().getOrDefault("tag", "")))); + return Mono.whenDelayError(super.onClose(), connection.onTerminate()); } @Override public void sendErrorAndClose(RSocketErrorException e) { final ByteBuf errorFrame = ErrorFrameCodec.encode(alloc(), 0, e); - // connection - // .outbound() - // .sendObject(FrameLengthCodec.encode(alloc(), errorFrame.readableBytes(), errorFrame)) - // .subscribe( - // new BaseSubscriber() { - // @Override - // protected void hookFinally(SignalType type) { - // connection.dispose(); - // } - // }); - - sender.tryEmitFinal(errorFrame); + sender.tryEmitFinal(FrameLengthCodec.encode(alloc(), errorFrame.readableBytes(), errorFrame)); } @Override diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java index 48cfd9d42..9deef6030 100644 --- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java +++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java @@ -24,12 +24,8 @@ import io.rsocket.internal.BaseDuplexConnection; import java.net.SocketAddress; import java.util.Objects; -import java.util.logging.Level; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.publisher.SignalType; import reactor.netty.Connection; /** @@ -40,7 +36,6 @@ * stitched back on for frames received. */ public final class WebsocketDuplexConnection extends BaseDuplexConnection { - static final Logger LOGGER = LoggerFactory.getLogger(WebsocketDuplexConnection.class); private final Connection connection; @@ -52,20 +47,11 @@ public final class WebsocketDuplexConnection extends BaseDuplexConnection { public WebsocketDuplexConnection(Connection connection) { this.connection = Objects.requireNonNull(connection, "connection must not be null"); - // connection.channel().closeFuture().addListener(future -> sender.onComplete()); - connection .outbound() - .sendObject( - sender - .map(BinaryWebSocketFrame::new) - .log( - "queue " + this, - Level.INFO, - SignalType.CANCEL, - SignalType.ON_ERROR, - SignalType.ON_COMPLETE)) + .sendObject(sender.map(BinaryWebSocketFrame::new).hide()) .then() + .doFinally(__ -> connection.dispose()) .subscribe(); } @@ -86,26 +72,7 @@ protected void doOnClose() { @Override public Mono onClose() { - return Mono.whenDelayError( - super.onClose() - .doOnEach( - s -> - LOGGER.info( - this - + " queue" - + s.getType() - + " " - + s.getContextView().getOrDefault("tag", ""))), - connection - .onDispose() - .doOnEach( - s -> - LOGGER.info( - this - + " connection" - + s.getType() - + " " - + s.getContextView().getOrDefault("tag", "")))); + return Mono.whenDelayError(super.onClose(), connection.onTerminate()); } @Override @@ -116,17 +83,6 @@ public Flux receive() { @Override public void sendErrorAndClose(RSocketErrorException e) { final ByteBuf errorFrame = ErrorFrameCodec.encode(alloc(), 0, e); - // connection - // .outbound() - // .sendObject(new BinaryWebSocketFrame(errorFrame)) - // .subscribe( - // new BaseSubscriber() { - // @Override - // protected void hookFinally(SignalType type) { - // connection.dispose(); - // } - // }); - sender.tryEmitFinal(errorFrame); } }