From 83c783bc8e8712ec1e8b1b558bce3165c06116ad Mon Sep 17 00:00:00 2001 From: Artem Vysochyn Date: Sun, 29 Sep 2024 18:25:40 +0300 Subject: [PATCH] Compactede ws/hjttp gateways --- .../services/gateway/GatewayTemplate.java | 76 ------- .../services/gateway/http/HttpGateway.java | 214 +++++++++--------- .../services/gateway/ws/WebsocketGateway.java | 175 ++++++++------ .../services/gateway/http/CorsTest.java | 18 +- .../http/HttpClientConnectionTest.java | 2 +- .../gateway/http/HttpGatewayExtension.java | 2 +- .../http/HttpLocalGatewayErrorMapperTest.java | 7 +- .../http/HttpLocalGatewayExtension.java | 2 +- .../WebsocketClientConnectionTest.java | 6 +- .../websocket/WebsocketClientTest.java | 7 +- .../websocket/WebsocketGatewayExtension.java | 2 +- .../WebsocketLocalGatewayErrorMapperTest.java | 6 +- .../WebsocketLocalGatewayExtension.java | 2 +- .../WebsocketLocalWithAuthExtension.java | 5 +- .../websocket/WebsocketServerTest.java | 7 +- 15 files changed, 262 insertions(+), 269 deletions(-) delete mode 100644 services-gateway/src/main/java/io/scalecube/services/gateway/GatewayTemplate.java diff --git a/services-gateway/src/main/java/io/scalecube/services/gateway/GatewayTemplate.java b/services-gateway/src/main/java/io/scalecube/services/gateway/GatewayTemplate.java deleted file mode 100644 index 4883719b9..000000000 --- a/services-gateway/src/main/java/io/scalecube/services/gateway/GatewayTemplate.java +++ /dev/null @@ -1,76 +0,0 @@ -package io.scalecube.services.gateway; - -import java.net.InetSocketAddress; -import reactor.core.publisher.Mono; -import reactor.netty.DisposableServer; -import reactor.netty.http.server.HttpServer; -import reactor.netty.resources.LoopResources; - -public abstract class GatewayTemplate implements Gateway { - - protected final GatewayOptions options; - - protected GatewayTemplate(GatewayOptions options) { - this.options = - new GatewayOptions() - .id(options.id()) - .port(options.port()) - .workerPool(options.workerPool()) - .call(options.call()); - } - - @Override - public final String id() { - return options.id(); - } - - /** - * Builds generic http server with given parameters. - * - * @param loopResources loop resources - * @param port listen port - * @return http server - */ - protected HttpServer prepareHttpServer(LoopResources loopResources, int port) { - return HttpServer.create() - .tcpConfiguration( - tcpServer -> { - if (loopResources != null) { - tcpServer = tcpServer.runOn(loopResources); - } - return tcpServer.bindAddress(() -> new InetSocketAddress(port)); - }); - } - - /** - * Shutting down loopResources if it's not null. - * - * @return mono handle - */ - protected final Mono shutdownLoopResources(LoopResources loopResources) { - return Mono.defer( - () -> { - if (loopResources == null) { - return Mono.empty(); - } - return loopResources.disposeLater(); - }); - } - - /** - * Shutting down server of type {@link DisposableServer} if it's not null. - * - * @param server server - * @return mono hanle - */ - protected final Mono shutdownServer(DisposableServer server) { - return Mono.defer( - () -> { - if (server == null) { - return Mono.empty(); - } - server.dispose(); - return server.onDispose(); - }); - } -} diff --git a/services-gateway/src/main/java/io/scalecube/services/gateway/http/HttpGateway.java b/services-gateway/src/main/java/io/scalecube/services/gateway/http/HttpGateway.java index d1c9a630b..04c120f65 100644 --- a/services-gateway/src/main/java/io/scalecube/services/gateway/http/HttpGateway.java +++ b/services-gateway/src/main/java/io/scalecube/services/gateway/http/HttpGateway.java @@ -1,7 +1,6 @@ package io.scalecube.services.gateway.http; import io.netty.handler.codec.http.HttpMethod; -import io.netty.handler.codec.http.cors.CorsConfig; import io.netty.handler.codec.http.cors.CorsConfigBuilder; import io.netty.handler.codec.http.cors.CorsHandler; import io.scalecube.services.Address; @@ -9,10 +8,9 @@ import io.scalecube.services.exceptions.ServiceProviderErrorMapper; import io.scalecube.services.gateway.Gateway; import io.scalecube.services.gateway.GatewayOptions; -import io.scalecube.services.gateway.GatewayTemplate; import java.net.InetSocketAddress; -import java.util.Map.Entry; import java.util.StringJoiner; +import java.util.function.Consumer; import java.util.function.UnaryOperator; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -20,129 +18,50 @@ import reactor.netty.http.server.HttpServer; import reactor.netty.resources.LoopResources; -public class HttpGateway extends GatewayTemplate { +public class HttpGateway implements Gateway { + private final GatewayOptions options; private final ServiceProviderErrorMapper errorMapper; + private final boolean corsEnabled; + private final CorsConfigBuilder corsConfigBuilder; private DisposableServer server; private LoopResources loopResources; - private boolean corsEnabled = false; - private CorsConfigBuilder corsConfigBuilder = - CorsConfigBuilder.forAnyOrigin() - .allowNullOrigin() - .maxAge(3600) - .allowedRequestMethods(HttpMethod.POST); - - public HttpGateway(GatewayOptions options) { - this(options, DefaultErrorMapper.INSTANCE); - } - - public HttpGateway(GatewayOptions options, ServiceProviderErrorMapper errorMapper) { - super(options); - this.errorMapper = errorMapper; + private HttpGateway(Builder builder) { + this.options = builder.options; + this.errorMapper = builder.errorMapper; + this.corsEnabled = builder.corsEnabled; + this.corsConfigBuilder = builder.corsConfigBuilder; } - private HttpGateway(HttpGateway other) { - super(other.options); - this.server = other.server; - this.loopResources = other.loopResources; - this.corsEnabled = other.corsEnabled; - this.corsConfigBuilder = copy(other.corsConfigBuilder); - this.errorMapper = other.errorMapper; + public HttpGateway(UnaryOperator operator) { + this(operator.apply(new Builder())); } - /** - * CORS enable. - * - * @param corsEnabled if set to true. - * @return HttpGateway with CORS settings. - */ - public HttpGateway corsEnabled(boolean corsEnabled) { - HttpGateway g = new HttpGateway(this); - g.corsEnabled = corsEnabled; - return g; - } - - /** - * Configure CORS with options. - * - * @param op for CORS. - * @return HttpGateway with CORS settings. - */ - public HttpGateway corsConfig(UnaryOperator op) { - HttpGateway g = new HttpGateway(this); - g.corsConfigBuilder = copy(op.apply(g.corsConfigBuilder)); - return g; - } - - private CorsConfigBuilder copy(CorsConfigBuilder other) { - CorsConfig config = other.build(); - CorsConfigBuilder corsConfigBuilder; - if (config.isAnyOriginSupported()) { - corsConfigBuilder = CorsConfigBuilder.forAnyOrigin(); - } else { - corsConfigBuilder = CorsConfigBuilder.forOrigins(config.origins().toArray(new String[0])); - } - - if (!config.isCorsSupportEnabled()) { - corsConfigBuilder.disable(); - } - - corsConfigBuilder - .exposeHeaders(config.exposedHeaders().toArray(new String[0])) - .allowedRequestHeaders(config.allowedRequestHeaders().toArray(new String[0])) - .allowedRequestMethods(config.allowedRequestMethods().toArray(new HttpMethod[0])) - .maxAge(config.maxAge()); - - for (Entry header : config.preflightResponseHeaders()) { - corsConfigBuilder.preflightResponseHeader(header.getKey(), header.getValue()); - } - - if (config.isShortCircuit()) { - corsConfigBuilder.shortCircuit(); - } - - if (config.isNullOriginAllowed()) { - corsConfigBuilder.allowNullOrigin(); - } - - if (config.isCredentialsAllowed()) { - corsConfigBuilder.allowCredentials(); - } - - return corsConfigBuilder; + @Override + public String id() { + return options.id(); } @Override public Mono start() { return Mono.defer( () -> { - HttpGatewayAcceptor acceptor = new HttpGatewayAcceptor(options.call(), errorMapper); + HttpGatewayAcceptor gatewayAcceptor = + new HttpGatewayAcceptor(options.call(), errorMapper); - loopResources = LoopResources.create("http-gateway"); + loopResources = LoopResources.create(options.id() + ":" + options.port()); return prepareHttpServer(loopResources, options.port()) - .handle(acceptor) + .handle(gatewayAcceptor) .bind() .doOnSuccess(server -> this.server = server) .thenReturn(this); }); } - @Override - public Address address() { - InetSocketAddress address = (InetSocketAddress) server.address(); - return Address.create(address.getHostString(), address.getPort()); - } - - @Override - public Mono stop() { - return Flux.concatDelayError(shutdownServer(server), shutdownLoopResources(loopResources)) - .then(); - } - - protected HttpServer prepareHttpServer(LoopResources loopResources, int port) { + private HttpServer prepareHttpServer(LoopResources loopResources, int port) { HttpServer httpServer = HttpServer.create(); if (loopResources != null) { @@ -159,14 +78,101 @@ protected HttpServer prepareHttpServer(LoopResources loopResources, int port) { }); } + @Override + public Address address() { + InetSocketAddress address = (InetSocketAddress) server.address(); + return Address.create(address.getHostString(), address.getPort()); + } + + @Override + public Mono stop() { + return Flux.concatDelayError(shutdownServer(server), shutdownLoopResources(loopResources)) + .then(); + } + + private Mono shutdownServer(DisposableServer server) { + return Mono.defer( + () -> { + if (server != null) { + server.dispose(); + return server.onDispose(); + } + return Mono.empty(); + }); + } + + private Mono shutdownLoopResources(LoopResources loopResources) { + return loopResources != null ? loopResources.disposeLater() : Mono.empty(); + } + @Override public String toString() { return new StringJoiner(", ", HttpGateway.class.getSimpleName() + "[", "]") - .add("server=" + server) - .add("loopResources=" + loopResources) + .add("options=" + options) + .add("errorMapper=" + errorMapper) .add("corsEnabled=" + corsEnabled) .add("corsConfigBuilder=" + corsConfigBuilder) - .add("options=" + options) + .add("server=" + server) + .add("loopResources=" + loopResources) .toString(); } + + public static class Builder { + + private GatewayOptions options; + private ServiceProviderErrorMapper errorMapper = DefaultErrorMapper.INSTANCE; + private boolean corsEnabled = false; + private CorsConfigBuilder corsConfigBuilder = + CorsConfigBuilder.forAnyOrigin() + .allowNullOrigin() + .maxAge(3600) + .allowedRequestMethods(HttpMethod.POST); + + public Builder() {} + + public GatewayOptions options() { + return options; + } + + public Builder options(GatewayOptions options) { + this.options = options; + return this; + } + + public ServiceProviderErrorMapper errorMapper() { + return errorMapper; + } + + public Builder errorMapper(ServiceProviderErrorMapper errorMapper) { + this.errorMapper = errorMapper; + return this; + } + + public boolean corsEnabled() { + return corsEnabled; + } + + public Builder corsEnabled(boolean corsEnabled) { + this.corsEnabled = corsEnabled; + return this; + } + + public CorsConfigBuilder corsConfigBuilder() { + return corsConfigBuilder; + } + + public Builder corsConfigBuilder(CorsConfigBuilder corsConfigBuilder) { + this.corsConfigBuilder = corsConfigBuilder; + return this; + } + + public Builder corsConfigBuilder(Consumer consumer) { + consumer.accept(this.corsConfigBuilder); + return this; + } + + public HttpGateway build() { + return new HttpGateway(this); + } + } } diff --git a/services-gateway/src/main/java/io/scalecube/services/gateway/ws/WebsocketGateway.java b/services-gateway/src/main/java/io/scalecube/services/gateway/ws/WebsocketGateway.java index 0da8b3c04..a4cc13a52 100644 --- a/services-gateway/src/main/java/io/scalecube/services/gateway/ws/WebsocketGateway.java +++ b/services-gateway/src/main/java/io/scalecube/services/gateway/ws/WebsocketGateway.java @@ -7,18 +7,20 @@ import io.scalecube.services.gateway.Gateway; import io.scalecube.services.gateway.GatewayOptions; import io.scalecube.services.gateway.GatewaySessionHandler; -import io.scalecube.services.gateway.GatewayTemplate; import java.net.InetSocketAddress; import java.time.Duration; import java.util.StringJoiner; +import java.util.function.UnaryOperator; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.netty.Connection; import reactor.netty.DisposableServer; +import reactor.netty.http.server.HttpServer; import reactor.netty.resources.LoopResources; -public class WebsocketGateway extends GatewayTemplate { +public class WebsocketGateway implements Gateway { + private final GatewayOptions options; private final GatewaySessionHandler gatewayHandler; private final Duration keepAliveInterval; private final ServiceProviderErrorMapper errorMapper; @@ -26,90 +28,51 @@ public class WebsocketGateway extends GatewayTemplate { private DisposableServer server; private LoopResources loopResources; - /** - * Constructor. - * - * @param options gateway options - */ - public WebsocketGateway(GatewayOptions options) { - this( - options, - Duration.ZERO, - GatewaySessionHandler.DEFAULT_INSTANCE, - DefaultErrorMapper.INSTANCE); + private WebsocketGateway(Builder builder) { + this.options = builder.options; + this.gatewayHandler = builder.gatewayHandler; + this.keepAliveInterval = builder.keepAliveInterval; + this.errorMapper = builder.errorMapper; } - /** - * Constructor. - * - * @param options gateway options - * @param keepAliveInterval keep alive interval - */ - public WebsocketGateway(GatewayOptions options, Duration keepAliveInterval) { - this( - options, - keepAliveInterval, - GatewaySessionHandler.DEFAULT_INSTANCE, - DefaultErrorMapper.INSTANCE); + public WebsocketGateway(UnaryOperator operator) { + this(operator.apply(new Builder())); } - /** - * Constructor. - * - * @param options gateway options - * @param gatewayHandler gateway handler - */ - public WebsocketGateway(GatewayOptions options, GatewaySessionHandler gatewayHandler) { - this(options, Duration.ZERO, gatewayHandler, DefaultErrorMapper.INSTANCE); - } - - /** - * Constructor. - * - * @param options gateway options - * @param errorMapper error mapper - */ - public WebsocketGateway(GatewayOptions options, ServiceProviderErrorMapper errorMapper) { - this(options, Duration.ZERO, GatewaySessionHandler.DEFAULT_INSTANCE, errorMapper); - } - - /** - * Constructor. - * - * @param options gateway options - * @param keepAliveInterval keep alive interval - * @param gatewayHandler gateway handler - * @param errorMapper error mapper - */ - public WebsocketGateway( - GatewayOptions options, - Duration keepAliveInterval, - GatewaySessionHandler gatewayHandler, - ServiceProviderErrorMapper errorMapper) { - super(options); - this.keepAliveInterval = keepAliveInterval; - this.gatewayHandler = gatewayHandler; - this.errorMapper = errorMapper; + @Override + public String id() { + return options.id(); } @Override public Mono start() { return Mono.defer( () -> { - WebsocketGatewayAcceptor acceptor = + WebsocketGatewayAcceptor gatewayAcceptor = new WebsocketGatewayAcceptor(options.call(), gatewayHandler, errorMapper); - loopResources = LoopResources.create("websocket-gateway"); + loopResources = LoopResources.create(options.id() + ":" + options.port()); return prepareHttpServer(loopResources, options.port()) .doOnConnection(this::setupKeepAlive) - .handle(acceptor) + .handle(gatewayAcceptor) .bind() .doOnSuccess(server -> this.server = server) .thenReturn(this); }); } + private HttpServer prepareHttpServer(LoopResources loopResources, int port) { + return HttpServer.create() + .tcpConfiguration( + tcpServer -> { + if (loopResources != null) { + tcpServer = tcpServer.runOn(loopResources); + } + return tcpServer.bindAddress(() -> new InetSocketAddress(port)); + }); + } + @Override public Address address() { InetSocketAddress address = (InetSocketAddress) server.address(); @@ -122,13 +85,19 @@ public Mono stop() { .then(); } - @Override - public String toString() { - return new StringJoiner(", ", WebsocketGateway.class.getSimpleName() + "[", "]") - .add("server=" + server) - .add("loopResources=" + loopResources) - .add("options=" + options) - .toString(); + private Mono shutdownServer(DisposableServer server) { + return Mono.defer( + () -> { + if (server != null) { + server.dispose(); + return server.onDispose(); + } + return Mono.empty(); + }); + } + + private Mono shutdownLoopResources(LoopResources loopResources) { + return loopResources != null ? loopResources.disposeLater() : Mono.empty(); } private void setupKeepAlive(Connection connection) { @@ -162,4 +131,66 @@ private void onReadIdle(Connection connection) { // no-op }); } + + @Override + public String toString() { + return new StringJoiner(", ", WebsocketGateway.class.getSimpleName() + "[", "]") + .add("options=" + options) + .add("gatewayHandler=" + gatewayHandler) + .add("keepAliveInterval=" + keepAliveInterval) + .add("errorMapper=" + errorMapper) + .add("server=" + server) + .add("loopResources=" + loopResources) + .toString(); + } + + public static class Builder { + + private GatewayOptions options; + private GatewaySessionHandler gatewayHandler = GatewaySessionHandler.DEFAULT_INSTANCE; + private Duration keepAliveInterval = Duration.ZERO; + private ServiceProviderErrorMapper errorMapper = DefaultErrorMapper.INSTANCE; + + public Builder() {} + + public GatewayOptions options() { + return options; + } + + public Builder options(GatewayOptions options) { + this.options = options; + return this; + } + + public GatewaySessionHandler gatewayHandler() { + return gatewayHandler; + } + + public Builder gatewayHandler(GatewaySessionHandler gatewayHandler) { + this.gatewayHandler = gatewayHandler; + return this; + } + + public Duration keepAliveInterval() { + return keepAliveInterval; + } + + public Builder keepAliveInterval(Duration keepAliveInterval) { + this.keepAliveInterval = keepAliveInterval; + return this; + } + + public ServiceProviderErrorMapper errorMapper() { + return errorMapper; + } + + public Builder errorMapper(ServiceProviderErrorMapper errorMapper) { + this.errorMapper = errorMapper; + return this; + } + + public WebsocketGateway build() { + return new WebsocketGateway(this); + } + } } diff --git a/services-gateway/src/test/java/io/scalecube/services/gateway/http/CorsTest.java b/services-gateway/src/test/java/io/scalecube/services/gateway/http/CorsTest.java index d7b2ca0b0..e65464ecf 100644 --- a/services-gateway/src/test/java/io/scalecube/services/gateway/http/CorsTest.java +++ b/services-gateway/src/test/java/io/scalecube/services/gateway/http/CorsTest.java @@ -60,11 +60,15 @@ void testCrossOriginRequest() { gatewayBuilder .gateway( opts -> - new HttpGateway(opts.id("http").port(HTTP_PORT)) - .corsEnabled(true) - .corsConfig( - config -> - config.allowedRequestHeaders("Content-Type", "X-Correlation-ID"))) + new HttpGateway( + builder -> + builder + .options(opts.id("http").port(HTTP_PORT)) + .corsEnabled(true) + .corsConfigBuilder( + corsConfigBuilder -> + corsConfigBuilder.allowedRequestHeaders( + "Content-Type", "X-Correlation-ID")))) .start() .block(TIMEOUT); @@ -113,7 +117,9 @@ void testCrossOriginRequest() { void testOptionRequestCorsDisabled() { gateway = gatewayBuilder - .gateway(opts -> new HttpGateway(opts.id("http").port(HTTP_PORT)).corsEnabled(false)) + .gateway( + opts -> + new HttpGateway(builder -> builder.options(opts.id("http").port(HTTP_PORT)))) .start() .block(TIMEOUT); diff --git a/services-gateway/src/test/java/io/scalecube/services/gateway/http/HttpClientConnectionTest.java b/services-gateway/src/test/java/io/scalecube/services/gateway/http/HttpClientConnectionTest.java index caf5b4214..266ac97a5 100644 --- a/services-gateway/src/test/java/io/scalecube/services/gateway/http/HttpClientConnectionTest.java +++ b/services-gateway/src/test/java/io/scalecube/services/gateway/http/HttpClientConnectionTest.java @@ -49,7 +49,7 @@ void beforEach() { .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) .options(opts -> opts.metadata(serviceEndpoint))) .transport(RSocketServiceTransport::new) - .gateway(options -> new HttpGateway(options.id("HTTP"))) + .gateway(options -> new HttpGateway(builder -> builder.options(options.id("HTTP")))) .startAwait(); gatewayAddress = gateway.gateway("HTTP").address(); diff --git a/services-gateway/src/test/java/io/scalecube/services/gateway/http/HttpGatewayExtension.java b/services-gateway/src/test/java/io/scalecube/services/gateway/http/HttpGatewayExtension.java index 4a8c6c036..e18e05e63 100644 --- a/services-gateway/src/test/java/io/scalecube/services/gateway/http/HttpGatewayExtension.java +++ b/services-gateway/src/test/java/io/scalecube/services/gateway/http/HttpGatewayExtension.java @@ -15,7 +15,7 @@ class HttpGatewayExtension extends AbstractGatewayExtension { HttpGatewayExtension(ServiceInfo serviceInfo) { super( serviceInfo, - opts -> new HttpGateway(opts.id(GATEWAY_ALIAS_NAME)), + opts -> new HttpGateway(builder -> builder.options(opts.id(GATEWAY_ALIAS_NAME))), GatewayClientTransports::httpGatewayClientTransport); } } diff --git a/services-gateway/src/test/java/io/scalecube/services/gateway/http/HttpLocalGatewayErrorMapperTest.java b/services-gateway/src/test/java/io/scalecube/services/gateway/http/HttpLocalGatewayErrorMapperTest.java index 068631242..be142db07 100644 --- a/services-gateway/src/test/java/io/scalecube/services/gateway/http/HttpLocalGatewayErrorMapperTest.java +++ b/services-gateway/src/test/java/io/scalecube/services/gateway/http/HttpLocalGatewayErrorMapperTest.java @@ -21,7 +21,12 @@ class HttpLocalGatewayErrorMapperTest extends BaseTest { static HttpLocalGatewayExtension extension = new HttpLocalGatewayExtension( ServiceInfo.fromServiceInstance(new ErrorServiceImpl()).errorMapper(ERROR_MAPPER).build(), - opts -> new HttpGateway(opts.call(opts.call().errorMapper(ERROR_MAPPER)), ERROR_MAPPER)); + opts -> + new HttpGateway( + builder -> + builder + .options(opts.call(opts.call().errorMapper(ERROR_MAPPER))) + .errorMapper(ERROR_MAPPER))); private ErrorService service; diff --git a/services-gateway/src/test/java/io/scalecube/services/gateway/http/HttpLocalGatewayExtension.java b/services-gateway/src/test/java/io/scalecube/services/gateway/http/HttpLocalGatewayExtension.java index a5626c090..bf57d041e 100644 --- a/services-gateway/src/test/java/io/scalecube/services/gateway/http/HttpLocalGatewayExtension.java +++ b/services-gateway/src/test/java/io/scalecube/services/gateway/http/HttpLocalGatewayExtension.java @@ -15,7 +15,7 @@ class HttpLocalGatewayExtension extends AbstractLocalGatewayExtension { } HttpLocalGatewayExtension(ServiceInfo serviceInfo) { - this(serviceInfo, HttpGateway::new); + this(serviceInfo, opts -> new HttpGateway(builder -> builder.options(opts))); } HttpLocalGatewayExtension( diff --git a/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketClientConnectionTest.java b/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketClientConnectionTest.java index b27baea25..61458e667 100644 --- a/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketClientConnectionTest.java +++ b/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketClientConnectionTest.java @@ -68,7 +68,11 @@ void beforEach() { .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) .options(opts -> opts.metadata(serviceEndpoint))) .transport(RSocketServiceTransport::new) - .gateway(options -> new WebsocketGateway(options.id("WS"), sessionEventHandler)) + .gateway( + options -> + new WebsocketGateway( + builder -> + builder.options(options.id("WS")).gatewayHandler(sessionEventHandler))) .startAwait(); gatewayAddress = gateway.gateway("WS").address(); diff --git a/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketClientTest.java b/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketClientTest.java index 34b6e6ef1..87252a807 100644 --- a/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketClientTest.java +++ b/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketClientTest.java @@ -55,7 +55,12 @@ static void beforeAll() { .options(opts -> opts.metadata(serviceEndpoint))) .transport(RSocketServiceTransport::new) .gateway( - options -> new WebsocketGateway(options.id("WS"), new TestGatewaySessionHandler())) + options -> + new WebsocketGateway( + builder -> + builder + .options(options.id("WS")) + .gatewayHandler(new TestGatewaySessionHandler()))) .startAwait(); gatewayAddress = gateway.gateway("WS").address(); diff --git a/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketGatewayExtension.java b/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketGatewayExtension.java index 24287dab3..19b01588b 100644 --- a/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketGatewayExtension.java +++ b/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketGatewayExtension.java @@ -16,7 +16,7 @@ class WebsocketGatewayExtension extends AbstractGatewayExtension { WebsocketGatewayExtension(ServiceInfo serviceInfo) { super( serviceInfo, - opts -> new WebsocketGateway(opts.id(GATEWAY_ALIAS_NAME)), + opts -> new WebsocketGateway(builder -> builder.options(opts.id(GATEWAY_ALIAS_NAME))), GatewayClientTransports::websocketGatewayClientTransport); } } diff --git a/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketLocalGatewayErrorMapperTest.java b/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketLocalGatewayErrorMapperTest.java index 8a5de923b..c698caacc 100644 --- a/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketLocalGatewayErrorMapperTest.java +++ b/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketLocalGatewayErrorMapperTest.java @@ -21,7 +21,11 @@ class WebsocketLocalGatewayErrorMapperTest extends BaseTest { new WebsocketLocalGatewayExtension( ServiceInfo.fromServiceInstance(new ErrorServiceImpl()).errorMapper(ERROR_MAPPER).build(), opts -> - new WebsocketGateway(opts.call(opts.call().errorMapper(ERROR_MAPPER)), ERROR_MAPPER)); + new WebsocketGateway( + builder -> + builder + .options(opts.call(opts.call().errorMapper(ERROR_MAPPER))) + .errorMapper(ERROR_MAPPER))); private ErrorService service; diff --git a/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketLocalGatewayExtension.java b/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketLocalGatewayExtension.java index 40f8af774..ce213fe40 100644 --- a/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketLocalGatewayExtension.java +++ b/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketLocalGatewayExtension.java @@ -16,7 +16,7 @@ class WebsocketLocalGatewayExtension extends AbstractLocalGatewayExtension { } WebsocketLocalGatewayExtension(ServiceInfo serviceInfo) { - this(serviceInfo, WebsocketGateway::new); + this(serviceInfo, opts -> new WebsocketGateway(builder -> builder.options(opts))); } WebsocketLocalGatewayExtension( diff --git a/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketLocalWithAuthExtension.java b/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketLocalWithAuthExtension.java index 6397c7322..15a703449 100644 --- a/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketLocalWithAuthExtension.java +++ b/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketLocalWithAuthExtension.java @@ -20,7 +20,10 @@ public class WebsocketLocalWithAuthExtension extends AbstractLocalGatewayExtensi serviceInfo, opts -> new WebsocketGateway( - opts.id(GATEWAY_ALIAS_NAME), new GatewaySessionHandlerImpl(authReg)), + builder -> + builder + .options(opts.id(GATEWAY_ALIAS_NAME)) + .gatewayHandler(new GatewaySessionHandlerImpl(authReg))), GatewayClientTransports::websocketGatewayClientTransport); } } diff --git a/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketServerTest.java b/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketServerTest.java index 14124e42a..02273bdea 100644 --- a/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketServerTest.java +++ b/services-gateway/src/test/java/io/scalecube/services/gateway/websocket/WebsocketServerTest.java @@ -54,7 +54,12 @@ static void beforeAll() { .options(opts -> opts.metadata(serviceEndpoint))) .transport(RSocketServiceTransport::new) .gateway( - options -> new WebsocketGateway(options.id("WS"), new TestGatewaySessionHandler())) + options -> + new WebsocketGateway( + builder -> + builder + .options(options.id("WS")) + .gatewayHandler(new TestGatewaySessionHandler()))) .transport(RSocketServiceTransport::new) .services(new TestServiceImpl()) .startAwait();