Skip to content

Commit

Permalink
fixes with UnboundedProcessor and TCP / WS connections design
Browse files Browse the repository at this point in the history
Signed-off-by: Oleh Dokuka <[email protected]>
Signed-off-by: Oleh Dokuka <[email protected]>
Signed-off-by: OlegDokuka <[email protected]>
  • Loading branch information
OlegDokuka committed Dec 1, 2022
1 parent 608c9eb commit 5afbd9d
Show file tree
Hide file tree
Showing 10 changed files with 109 additions and 182 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ private void terminate(Throwable e) {
requesterLeaseTracker.dispose(e);
}

final Collection<FrameHandler> activeStreamsCopy;
final Collection<FrameHandler> activeStreamsCopy; // in case of graceful shut down is empty
synchronized (this) {
final IntObjectMap<FrameHandler> activeStreams = this.activeStreams;
activeStreamsCopy = new ArrayList<>(activeStreams.values());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ public int addAndGetNextStreamId(FrameHandler frameHandler) {

public synchronized boolean add(int streamId, FrameHandler frameHandler) {
if (this.terminating) {
throw new CanceledException("Disposed");
throw new CanceledException(
"This RSocket is either disposed or disposing, and no longer accepting new requests");
}

final IntObjectMap<FrameHandler> activeStreams = this.activeStreams;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ public abstract class BaseDuplexConnection implements DuplexConnection {
protected final UnboundedProcessor sender =
new UnboundedProcessor(
() -> {
System.out.println("queue is done");
onClose.tryEmitEmpty();
dispose();
// dispose();
},
(__) -> {});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,8 @@ public boolean tryEmitFinal(ByteBuf t) {
return false;
}

this.done = true;
this.last = t;
this.done = true;

final long previousState = markValueAddedAndTerminated(this);
if (isFinalized(previousState)) {
Expand All @@ -216,6 +216,7 @@ public boolean tryEmitFinal(ByteBuf t) {
if (this.outputFused) {
// fast path for fusion
this.actual.onNext(null);
this.actual.onComplete();
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import io.rsocket.frame.ResumeOkFrameCodec;
import io.rsocket.keepalive.KeepAliveSupport;
import java.time.Duration;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;
import org.reactivestreams.Subscription;
Expand Down Expand Up @@ -189,7 +188,7 @@ void tryReestablishSession(Tuple2<ByteBuf, DuplexConnection> tuple2) {
}
final ConnectionErrorException connectionErrorException =
new ConnectionErrorException("RESUME_OK frame must be received before any others");
resumableConnection.dispose(connectionErrorException);
resumableConnection.dispose(nextDuplexConnection, connectionErrorException);
nextDuplexConnection.sendErrorAndClose(connectionErrorException);
nextDuplexConnection.receive().subscribe().dispose();

Expand Down Expand Up @@ -227,7 +226,7 @@ void tryReestablishSession(Tuple2<ByteBuf, DuplexConnection> tuple2) {
}
final ConnectionErrorException t = new ConnectionErrorException(e.getMessage(), e);

resumableConnection.dispose(t);
resumableConnection.dispose(nextDuplexConnection, t);

nextDuplexConnection.sendErrorAndClose(t);
nextDuplexConnection.receive().subscribe().dispose();
Expand Down Expand Up @@ -278,7 +277,7 @@ void tryReestablishSession(Tuple2<ByteBuf, DuplexConnection> tuple2) {
final ConnectionErrorException connectionErrorException =
new ConnectionErrorException("resumption_server_pos=[" + remoteImpliedPos + "]");

resumableConnection.dispose(connectionErrorException);
resumableConnection.dispose(nextDuplexConnection, connectionErrorException);

nextDuplexConnection.sendErrorAndClose(connectionErrorException);
nextDuplexConnection.receive().subscribe().dispose();
Expand All @@ -292,7 +291,7 @@ void tryReestablishSession(Tuple2<ByteBuf, DuplexConnection> tuple2) {
exception);
}
if (exception instanceof RejectedResumeException) {
resumableConnection.dispose(exception);
resumableConnection.dispose(nextDuplexConnection, exception);
nextDuplexConnection.dispose();
nextDuplexConnection.receive().subscribe().dispose();
return;
Expand All @@ -309,7 +308,7 @@ void tryReestablishSession(Tuple2<ByteBuf, DuplexConnection> tuple2) {
final ConnectionErrorException connectionErrorException =
new ConnectionErrorException("RESUME_OK frame must be received before any others");

resumableConnection.dispose(connectionErrorException);
resumableConnection.dispose(nextDuplexConnection, connectionErrorException);

nextDuplexConnection.sendErrorAndClose(connectionErrorException);
nextDuplexConnection.receive().subscribe().dispose();
Expand Down Expand Up @@ -349,11 +348,7 @@ public void onError(Throwable t) {
Operators.onErrorDropped(t, currentContext());
}

if (t instanceof TimeoutException) {
resumableConnection.dispose();
} else {
resumableConnection.dispose(t);
}
resumableConnection.dispose();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import io.netty.util.CharsetUtil;
import io.rsocket.DuplexConnection;
import io.rsocket.RSocketErrorException;
import io.rsocket.exceptions.ConnectionCloseException;
import io.rsocket.exceptions.ConnectionErrorException;
import io.rsocket.frame.ErrorFrameCodec;
import io.rsocket.frame.FrameHeaderCodec;
import io.rsocket.internal.UnboundedProcessor;
import java.net.SocketAddress;
Expand Down Expand Up @@ -50,8 +50,8 @@ public class ResumableDuplexConnection extends Flux<ByteBuf>
final ResumableFramesStore resumableFramesStore;

final UnboundedProcessor savableFramesSender;
final Disposable framesSaverDisposable;
final Sinks.Empty<Void> onClose;
final Sinks.Empty<Void> onQueueClose;
final Sinks.Empty<Void> onLastConnectionClose;
final SocketAddress remoteAddress;
final Sinks.Many<Integer> onConnectionClosedSink;

Expand Down Expand Up @@ -79,11 +79,13 @@ public ResumableDuplexConnection(
this.session = session.toString(CharsetUtil.UTF_8);
this.onConnectionClosedSink = Sinks.unsafe().many().unicast().onBackpressureBuffer();
this.resumableFramesStore = resumableFramesStore;
this.savableFramesSender = new UnboundedProcessor();
this.framesSaverDisposable = resumableFramesStore.saveFrames(savableFramesSender).subscribe();
this.onClose = Sinks.empty();
this.onQueueClose = Sinks.unsafe().empty();
this.onLastConnectionClose = Sinks.unsafe().empty();
this.savableFramesSender = new UnboundedProcessor(onQueueClose::tryEmitEmpty, __ -> {});
this.remoteAddress = initialConnection.remoteAddress();

resumableFramesStore.saveFrames(savableFramesSender).subscribe();

ACTIVE_CONNECTION.lazySet(this, initialConnection);
}

Expand Down Expand Up @@ -120,10 +122,12 @@ void initConnection(DuplexConnection nextConnection) {
.resumeStream()
.subscribe(
f -> nextConnection.sendFrame(FrameHeaderCodec.streamId(f), f),
t -> sendErrorAndClose(new ConnectionErrorException(t.getMessage())),
t ->
nextConnection.sendErrorAndClose(
new ConnectionErrorException(t.getMessage(), t)),
() ->
sendErrorAndClose(
new ConnectionCloseException("Connection Closed Unexpectedly")));
nextConnection.sendErrorAndClose(
new ConnectionErrorException("Connection Closed Unexpectedly")));
nextConnection.receive().subscribe(frameReceivingSubscriber);
nextConnection
.onClose()
Expand Down Expand Up @@ -161,9 +165,9 @@ public void disconnect() {
@Override
public void sendFrame(int streamId, ByteBuf frame) {
if (streamId == 0) {
savableFramesSender.onNextPrioritized(frame);
savableFramesSender.tryEmitPrioritized(frame);
} else {
savableFramesSender.onNext(frame);
savableFramesSender.tryEmitNormal(frame);
}
}

Expand All @@ -184,32 +188,25 @@ public void sendErrorAndClose(RSocketErrorException rSocketErrorException) {
return;
}

activeConnection.sendErrorAndClose(rSocketErrorException);
savableFramesSender.tryEmitFinal(
ErrorFrameCodec.encode(activeConnection.alloc(), 0, rSocketErrorException));

activeConnection
.onClose()
.subscribe(
null,
t -> {
framesSaverDisposable.dispose();
activeReceivingSubscriber.dispose();
savableFramesSender.onComplete();
savableFramesSender.cancel();
onConnectionClosedSink.tryEmitComplete();

onClose.tryEmitError(t);
onLastConnectionClose.tryEmitEmpty();
},
() -> {
framesSaverDisposable.dispose();
activeReceivingSubscriber.dispose();
savableFramesSender.onComplete();
savableFramesSender.cancel();
onConnectionClosedSink.tryEmitComplete();

final Throwable cause = rSocketErrorException.getCause();
if (cause == null) {
onClose.tryEmitEmpty();
onLastConnectionClose.tryEmitEmpty();
} else {
onClose.tryEmitError(cause);
onLastConnectionClose.tryEmitError(cause);
}
});
}
Expand All @@ -226,50 +223,62 @@ public ByteBufAllocator alloc() {

@Override
public Mono<Void> onClose() {
return onClose.asMono();
return Mono.whenDelayError(
onQueueClose.asMono().log(side + "_queue"),
resumableFramesStore.onClose().log(side + "_frame_store"),
onLastConnectionClose.asMono().log(side + "_last_connection"));
}

@Override
public void dispose() {
dispose(null);
}

void dispose(@Nullable Throwable e) {
logger.info(side + "_disposing");
final DuplexConnection activeConnection =
ACTIVE_CONNECTION.getAndSet(this, DisposedConnection.INSTANCE);
if (activeConnection == DisposedConnection.INSTANCE) {
return;
}

if (activeConnection != null) {
activeConnection.dispose();
}

if (logger.isDebugEnabled()) {
logger.debug(
"Side[{}]|Session[{}]|DuplexConnection[{}]. Disposing...",
side,
session,
connectionIndex);
}

framesSaverDisposable.dispose();
activeReceivingSubscriber.dispose();
logger.info(side + "_disposing2");
savableFramesSender.onComplete();
savableFramesSender.cancel();
onConnectionClosedSink.tryEmitComplete();
activeConnection
.onClose()
.subscribe(
null,
t -> {
onConnectionClosedSink.tryEmitComplete();
onLastConnectionClose.tryEmitEmpty();
},
() -> {
onConnectionClosedSink.tryEmitComplete();
onLastConnectionClose.tryEmitEmpty();
});
}

if (e != null) {
onClose.tryEmitError(e);
} else {
onClose.tryEmitEmpty();
void dispose(DuplexConnection nextConnection, @Nullable Throwable e) {
final DuplexConnection activeConnection =
ACTIVE_CONNECTION.getAndSet(this, DisposedConnection.INSTANCE);
if (activeConnection == DisposedConnection.INSTANCE) {
return;
}
savableFramesSender.onComplete();
nextConnection
.onClose()
.subscribe(
null,
t -> {
onConnectionClosedSink.tryEmitComplete();
onLastConnectionClose.tryEmitEmpty();
},
() -> {
onConnectionClosedSink.tryEmitComplete();
onLastConnectionClose.tryEmitEmpty();
});
}

@Override
@SuppressWarnings("ConstantConditions")
public boolean isDisposed() {
return onClose.scan(Scannable.Attr.TERMINATED) || onClose.scan(Scannable.Attr.CANCELLED);
return onQueueClose.scan(Scannable.Attr.TERMINATED)
|| onQueueClose.scan(Scannable.Attr.CANCELLED);
}

@Override
Expand All @@ -280,6 +289,7 @@ public SocketAddress remoteAddress() {
@Override
public void request(long n) {
if (state == 1 && STATE.compareAndSet(this, 1, 2)) {
// happens for the very first time with the initial connection
initConnection(this.activeConnection);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,35 +29,39 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Mono;

public final class ClientStreamingToServer {

private static final Logger logger = LoggerFactory.getLogger(ClientStreamingToServer.class);

public static void main(String[] args) throws InterruptedException {
RSocketServer.create(
SocketAcceptor.with(
new RSocket() {
final Sinks.Empty<Void> onGracefulShutdownSink = Sinks.unsafe().empty();
(setup, sendingSocket) -> {
sendingSocket.disposeGracefully();

@Override
public Flux<Payload> requestStream(Payload payload) {
return Flux.interval(Duration.ofMillis(100))
.takeUntilOther(onGracefulShutdownSink.asMono())
.map(aLong -> DefaultPayload.create("Interval: " + aLong));
}
return Mono.just(
new RSocket() {
@Override
public Flux<Payload> requestStream(Payload payload) {
return Flux.interval(Duration.ofMillis(100))
.map(aLong -> DefaultPayload.create("Interval: " + aLong));
}

@Override
public void disposeGracefully() {
// can be intercepted there
// onGracefulShutdownSink.tryEmitEmpty();
}
}))
@Override
public void disposeGracefully() {}
});
})
.bindNow(TcpServerTransport.create("localhost", 7000));

RSocket socket =
RSocketConnector.create()
.acceptor(
SocketAcceptor.with(
new RSocket() {
@Override
public void disposeGracefully() {}
}))
.setupPayload(DefaultPayload.create("test", "test"))
.connect(TcpClientTransport.create("localhost", 7000))
.block();
Expand All @@ -73,11 +77,14 @@ public void disposeGracefully() {
logger.debug(msg);
counter.incrementAndGet();
})
.take(100)
.subscribe();

logger.debug("dispose gracefully");
socket.disposeGracefully();
//
// Mono.delay(Duration.ofSeconds(10))
// .doFinally((__) -> socket.dispose())
// .subscribe();

socket.onClose().block();

Expand Down
Loading

0 comments on commit 5afbd9d

Please sign in to comment.