-
Notifications
You must be signed in to change notification settings - Fork 352
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Provides GracefulShutdown functionality #1078
base: master
Are you sure you want to change the base?
Conversation
64e7f1f
to
5582290
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great changes!
There are 5 sinks passed around for coordination, in addition to 2 more Mono
s for when both sides are completed or closed. It takes some effort to trace their usages and I'm thinking we could encapsulate this behavior a bit to make the graceful shutdown code a bit more readable and intuitive.
As far as I can see this is primarily requester side driven, but the responder does need to be told when graceful shutdown begins, and it also needs to be able to communicate when it is done on its own side.
Let's say we have a package private contract like this:
interface GracefulShutdownTracker {
Mono<Void> onGracefulShutdownStarted();
Sinks.Empty<Void> responderCompletedSink();
Sinks.Empty<Void> responderClosedSink();
}
Then inside RSocketRequester
:
class RSocketRequester extends RequesterResponderSupport implements RSocket {
private final DefaultGracefulShutdownTracker shutdownTracker = new DefaultGracefulShutdownTracker();
// ...
public GracefulShutdownTracker getGracefulShutdownTracker() {
return gracefulShutdownTracker;
}
private final static class DefaultGracefulShutdownTracker implements GracefulShutdownTracker {
private final Sinks.Empty<Void> startedSink = Sinks.unsafe().empty();
private final Sinks.Empty<Void> completedSink = Sinks.unsafe().empty();
private final Sinks.Empty<Void> closedSink = Sinks.unsafe().empty();
private final Sinks.Empty<Void> responderCompletedSink = Sinks.unsafe().empty();
private final Sinks.Empty<Void> responderClosedSink = Sinks.unsafe().empty();
public Mono<Void> onGracefulShutdownStarted() {
return startedSink.asMono();
}
@Override
public Sinks.Empty<Void> responderCompletedSink() {
return responderCompletedSink;
}
@Override
public Sinks.Empty<Void> responderClosedSink() {
return responderClosedSink;
}
Mono<Void> onBothCompleted() {
return Mono.whenDelayError(
responderCompletedSink.asMono(),
completedSink.asMono());
}
Mono<Void> onBothClosed() {
return Mono.whenDelayError(
responderClosedSink.asMono(),
closedSink.asMono());
}
}
}
Now RSocketRequester
has all the Sinks and Monos it needs access internally to do its work, and also declares a GracefulShutdownControl
getter that exposes what the responder needs to do its work.
This makes it clear who has what responsibility, it removes the need to pass local variables around, replacing those with a single container, and it's an opportunity to shorten sink names, to give monos a name, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One more note. Currently, RequestResponderSupport
has an onGracefulShutdownSink
as a constructor initiailized field.
In my proposal above, this could be replaced with an abstract method, something like void GracefulShutdownCompleted()
which the requester and the responder would implemented accordingly based on what's available to them.
5afbd9d
to
5b0f592
Compare
Signed-off-by: Oleh Dokuka <[email protected]> Signed-off-by: Oleh Dokuka <[email protected]> Signed-off-by: OlegDokuka <[email protected]>
5049c36
to
3bf4de8
Compare
closes #988
Signed-off-by: Oleh Dokuka [email protected]
Signed-off-by: Oleh Dokuka [email protected]