Skip to content

Commit

Permalink
Support of heartbeat for websocket gateway (#864)
Browse files Browse the repository at this point in the history
  • Loading branch information
artem-v authored Oct 13, 2024
1 parent d2a1fc1 commit 07c9b05
Show file tree
Hide file tree
Showing 19 changed files with 176 additions and 214 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package io.scalecube.services.gateway;

import io.scalecube.services.Address;
import io.scalecube.services.ServiceCall;
import io.scalecube.services.registry.api.ServiceRegistry;

public interface Gateway {

Expand All @@ -21,9 +23,11 @@ public interface Gateway {
/**
* Starts gateway.
*
* @param call {@link ServiceCall} instance
* @param serviceRegistry {@link ServiceRegistry} instance
* @return gateway instance
*/
Gateway start();
Gateway start(ServiceCall call, ServiceRegistry serviceRegistry);

/** Stops gateway. */
void stop();
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public interface ServiceTransport {
/**
* Provider for {@link ServerTransport}.
*
* @param serviceRegistry serviceRegistry
* @param serviceRegistry {@link ServiceRegistry} instance
* @return {@code ServerTransport} instance
*/
ServerTransport serverTransport(ServiceRegistry serviceRegistry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,23 @@
import io.netty.handler.codec.http.cors.CorsConfigBuilder;
import io.netty.handler.codec.http.cors.CorsHandler;
import io.scalecube.services.Address;
import io.scalecube.services.ServiceCall;
import io.scalecube.services.exceptions.DefaultErrorMapper;
import io.scalecube.services.exceptions.ServiceProviderErrorMapper;
import io.scalecube.services.gateway.Gateway;
import io.scalecube.services.gateway.GatewayOptions;
import io.scalecube.services.registry.api.ServiceRegistry;
import java.net.InetSocketAddress;
import java.util.StringJoiner;
import java.util.function.Consumer;
import java.util.function.Function;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;
import reactor.netty.resources.LoopResources;

public class HttpGateway implements Gateway {

private final GatewayOptions options;
private final String id;
private final int port;
private final Function<ServiceCall, ServiceCall> callFactory;
private final ServiceProviderErrorMapper errorMapper;
private final boolean corsEnabled;
private final CorsConfigBuilder corsConfigBuilder;
Expand All @@ -26,28 +29,35 @@ public class HttpGateway implements Gateway {
private LoopResources loopResources;

private HttpGateway(Builder builder) {
this.options = builder.options;
this.id = builder.id;
this.port = builder.port;
this.callFactory = builder.callFactory;
this.errorMapper = builder.errorMapper;
this.corsEnabled = builder.corsEnabled;
this.corsConfigBuilder = builder.corsConfigBuilder;
}

@Override
public String id() {
return options.id();
return id;
}

@Override
public Gateway start() {
HttpGatewayAcceptor gatewayAcceptor = new HttpGatewayAcceptor(options.call(), errorMapper);

public Gateway start(ServiceCall call, ServiceRegistry serviceRegistry) {
loopResources =
LoopResources.create(
options.id() + ":" + options.port(), LoopResources.DEFAULT_IO_WORKER_COUNT, true);
LoopResources.create(id + ":" + port, LoopResources.DEFAULT_IO_WORKER_COUNT, true);

try {
prepareHttpServer(loopResources, options.port())
.handle(gatewayAcceptor)
HttpServer.create()
.runOn(loopResources)
.bindAddress(() -> new InetSocketAddress(port))
.doOnConnection(
connection -> {
if (corsEnabled) {
connection.addHandlerLast(new CorsHandler(corsConfigBuilder.build()));
}
})
.handle(new HttpGatewayAcceptor(callFactory.apply(call), errorMapper))
.bind()
.doOnSuccess(server -> this.server = server)
.toFuture()
Expand All @@ -59,23 +69,6 @@ public Gateway start() {
return this;
}

private HttpServer prepareHttpServer(LoopResources loopResources, int port) {
HttpServer httpServer = HttpServer.create();

if (loopResources != null) {
httpServer = httpServer.runOn(loopResources);
}

return httpServer
.bindAddress(() -> new InetSocketAddress(port))
.doOnConnection(
connection -> {
if (corsEnabled) {
connection.addHandlerLast(new CorsHandler(corsConfigBuilder.build()));
}
});
}

@Override
public Address address() {
InetSocketAddress address = (InetSocketAddress) server.address();
Expand All @@ -100,21 +93,11 @@ private void shutdownLoopResources(LoopResources loopResources) {
}
}

@Override
public String toString() {
return new StringJoiner(", ", HttpGateway.class.getSimpleName() + "[", "]")
.add("options=" + options)
.add("errorMapper=" + errorMapper)
.add("corsEnabled=" + corsEnabled)
.add("corsConfigBuilder=" + corsConfigBuilder)
.add("server=" + server)
.add("loopResources=" + loopResources)
.toString();
}

public static class Builder {

private GatewayOptions options;
private String id = "http@" + Integer.toHexString(hashCode());
private int port;
private Function<ServiceCall, ServiceCall> callFactory = call -> call;
private ServiceProviderErrorMapper errorMapper = DefaultErrorMapper.INSTANCE;
private boolean corsEnabled = false;
private CorsConfigBuilder corsConfigBuilder =
Expand All @@ -125,12 +108,26 @@ public static class Builder {

public Builder() {}

public GatewayOptions options() {
return options;
public String id() {
return id;
}

public Builder id(String id) {
this.id = id;
return this;
}

public int port() {
return port;
}

public Builder port(int port) {
this.port = port;
return this;
}

public Builder options(GatewayOptions options) {
this.options = options;
public Builder serviceCall(Function<ServiceCall, ServiceCall> operator) {
callFactory = callFactory.andThen(operator);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import io.scalecube.services.ServiceCall;
import io.scalecube.services.api.ErrorData;
import io.scalecube.services.api.ServiceMessage;
import io.scalecube.services.exceptions.DefaultErrorMapper;
import io.scalecube.services.exceptions.ServiceProviderErrorMapper;
import io.scalecube.services.gateway.ReferenceCountUtil;
import io.scalecube.services.transport.api.DataCodec;
Expand All @@ -37,10 +36,6 @@ public class HttpGatewayAcceptor
private final ServiceCall serviceCall;
private final ServiceProviderErrorMapper errorMapper;

HttpGatewayAcceptor(ServiceCall serviceCall) {
this(serviceCall, DefaultErrorMapper.INSTANCE);
}

HttpGatewayAcceptor(ServiceCall serviceCall, ServiceProviderErrorMapper errorMapper) {
this.serviceCall = serviceCall;
this.errorMapper = errorMapper;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package io.scalecube.services.gateway.websocket;

import io.scalecube.services.annotations.Service;
import io.scalecube.services.annotations.ServiceMethod;
import reactor.core.publisher.Mono;

@Service(HeartbeatService.NAMESPACE)
public interface HeartbeatService {

String NAMESPACE = "v1/scalecube.websocket.heartbeat";

@ServiceMethod
Mono<Long> ping(long value);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package io.scalecube.services.gateway.websocket;

import reactor.core.publisher.Mono;

public class HeartbeatServiceImpl implements HeartbeatService {

@Override
public Mono<Long> ping(long value) {
return Mono.just(value);
}
}
Loading

0 comments on commit 07c9b05

Please sign in to comment.