diff --git a/cluster-api/src/main/java/io/scalecube/cluster/Cluster.java b/cluster-api/src/main/java/io/scalecube/cluster/Cluster.java index c0df446b..e474ca6b 100644 --- a/cluster-api/src/main/java/io/scalecube/cluster/Cluster.java +++ b/cluster-api/src/main/java/io/scalecube/cluster/Cluster.java @@ -3,6 +3,7 @@ import io.scalecube.cluster.transport.api.Message; import io.scalecube.net.Address; import java.util.Collection; +import java.util.List; import java.util.Optional; import reactor.core.publisher.Mono; @@ -14,7 +15,7 @@ public interface Cluster { * * @return cluster address */ - Address address(); + List
addresses(); /** * Spreads given message between cluster members using gossiping protocol. diff --git a/cluster-api/src/main/java/io/scalecube/cluster/ClusterConfig.java b/cluster-api/src/main/java/io/scalecube/cluster/ClusterConfig.java index ee6a1b09..39feb4b5 100644 --- a/cluster-api/src/main/java/io/scalecube/cluster/ClusterConfig.java +++ b/cluster-api/src/main/java/io/scalecube/cluster/ClusterConfig.java @@ -5,6 +5,8 @@ import io.scalecube.cluster.membership.MembershipConfig; import io.scalecube.cluster.metadata.MetadataCodec; import io.scalecube.cluster.transport.api.TransportConfig; +import java.util.Arrays; +import java.util.List; import java.util.Optional; import java.util.StringJoiner; import java.util.function.UnaryOperator; @@ -35,8 +37,7 @@ public final class ClusterConfig implements Cloneable { private String memberId; private String memberAlias; - private String externalHost; - private Integer externalPort; + private List externalHosts; private TransportConfig transportConfig = TransportConfig.defaultConfig(); private FailureDetectorConfig failureDetectorConfig = FailureDetectorConfig.defaultConfig(); @@ -136,27 +137,39 @@ public ClusterConfig metadataCodec(MetadataCodec metadataCodec) { } /** - * Returns externalHost. {@code externalHost} is a config property for container environments, - * it's being set for advertising to scalecube cluster some connectable hostname which maps to + * Returns externalHosts. {@code externalHosts} is a config property for container environments, + * it's being set for advertising to scalecube cluster some connectable hostnames which maps to * scalecube transport's hostname on which scalecube transport is listening. * - * @return external host + * @return external hosts */ - public String externalHost() { - return externalHost; + public List externalHosts() { + return externalHosts; } /** - * Setter for externalHost. {@code externalHost} is a config property for container environments, - * it's being set for advertising to scalecube cluster some connectable hostname which maps to - * scalecube transport's hostname on which scalecube transport is listening. + * Setter for externalHosts. {@code externalHosts} is a config property for container + * environments, it's being set for advertising to scalecube cluster some connectable hostnames + * which maps to scalecube transport's hostname on which scalecube transport is listening. + * + * @param externalHosts external hosts + * @return new {@code ClusterConfig} instance + */ + public ClusterConfig externalHosts(String... externalHosts) { + return externalHosts(Arrays.asList(externalHosts)); + } + + /** + * Setter for externalHosts. {@code externalHosts} is a config property for container + * environments, it's being set for advertising to scalecube cluster some connectable hostnames + * which maps to scalecube transport's hostname on which scalecube transport is listening. * - * @param externalHost external host + * @param externalHosts external hosts * @return new {@code ClusterConfig} instance */ - public ClusterConfig externalHost(String externalHost) { + public ClusterConfig externalHosts(List externalHosts) { ClusterConfig c = clone(); - c.externalHost = externalHost; + c.externalHosts = externalHosts; return c; } @@ -205,31 +218,6 @@ public ClusterConfig memberAlias(String memberAlias) { return c; } - /** - * Returns externalPort. {@code externalPort} is a config property for container environments, - * it's being set for advertising to scalecube cluster a port which mapped to scalecube - * transport's listening port. - * - * @return external port - */ - public Integer externalPort() { - return externalPort; - } - - /** - * Setter for externalPort. {@code externalPort} is a config property for container environments, - * it's being set for advertising to scalecube cluster a port which mapped to scalecube - * transport's listening port. - * - * @param externalPort external port - * @return new {@code ClusterConfig} instance - */ - public ClusterConfig externalPort(Integer externalPort) { - ClusterConfig c = clone(); - c.externalPort = externalPort; - return c; - } - /** * Applies {@link TransportConfig} settings. * @@ -316,8 +304,7 @@ public String toString() { .add("metadataCodec=" + metadataCodec) .add("memberId='" + memberId + "'") .add("memberAlias='" + memberAlias + "'") - .add("externalHost='" + externalHost + "'") - .add("externalPort=" + externalPort) + .add("externalHosts=" + externalHosts) .add("transportConfig=" + transportConfig) .add("failureDetectorConfig=" + failureDetectorConfig) .add("gossipConfig=" + gossipConfig) diff --git a/cluster-api/src/main/java/io/scalecube/cluster/Member.java b/cluster-api/src/main/java/io/scalecube/cluster/Member.java index 65dc7835..42b958c9 100644 --- a/cluster-api/src/main/java/io/scalecube/cluster/Member.java +++ b/cluster-api/src/main/java/io/scalecube/cluster/Member.java @@ -6,6 +6,9 @@ import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.Objects; import java.util.StringJoiner; import java.util.UUID; @@ -20,7 +23,7 @@ public final class Member implements Externalizable { private String id; private String alias; - private Address address; + private List
addresses; private String namespace; public Member() {} @@ -28,16 +31,28 @@ public Member() {} /** * Constructor. * - * @param id member id; not null + * @param id member id * @param alias member alias (optional) - * @param address member address; not null - * @param namespace namespace; not null + * @param addresses member addresses + * @param namespace namespace + */ + public Member(String id, String alias, List
addresses, String namespace) { + this.id = Objects.requireNonNull(id, "id"); + this.alias = alias; + this.addresses = Objects.requireNonNull(addresses, "addresses"); + this.namespace = Objects.requireNonNull(namespace, "namespace"); + } + + /** + * Constructor. + * + * @param id member id + * @param alias member alias (optional) + * @param address member address + * @param namespace namespace */ public Member(String id, String alias, Address address, String namespace) { - this.id = Objects.requireNonNull(id, "member id"); - this.alias = alias; // optional - this.address = Objects.requireNonNull(address, "member address"); - this.namespace = Objects.requireNonNull(namespace, "member namespace"); + this(id, alias, Collections.singletonList(address), namespace); } /** @@ -70,14 +85,14 @@ public String namespace() { } /** - * Returns cluster member address, an address on which this cluster member listens connections - * from other cluster members. + * Returns cluster member addresses, those are addresses on which this cluster member listens + * connections from other cluster members. * * @see io.scalecube.cluster.transport.api.TransportConfig#port(int) * @return member address */ - public Address address() { - return address; + public List
addresses() { + return addresses; } @Override @@ -90,13 +105,13 @@ public boolean equals(Object that) { } Member member = (Member) that; return Objects.equals(id, member.id) - && Objects.equals(address, member.address) + && Objects.equals(addresses, member.addresses) && Objects.equals(namespace, member.namespace); } @Override public int hashCode() { - return Objects.hash(id, address, namespace); + return Objects.hash(id, addresses, namespace); } @Override @@ -110,7 +125,10 @@ public void writeExternal(ObjectOutput out) throws IOException { out.writeUTF(alias); } // address - out.writeUTF(address.toString()); + out.writeInt(addresses.size()); + for (Address address : addresses) { + out.writeUTF(address.toString()); + } // namespace out.writeUTF(namespace); } @@ -124,8 +142,12 @@ public void readExternal(ObjectInput in) throws IOException { if (aliasNotNull) { alias = in.readUTF(); } - // address - address = Address.from(in.readUTF()); + // addresses + final int addressesSize = in.readInt(); + addresses = new ArrayList<>(addressesSize); + for (int i = 0; i < addressesSize; i++) { + addresses.add(Address.from(in.readUTF())); + } // namespace this.namespace = in.readUTF(); } @@ -143,9 +165,9 @@ private static String stringifyId(String id) { public String toString() { StringJoiner stringJoiner = new StringJoiner(":"); if (alias == null) { - return stringJoiner.add(namespace).add(stringifyId(id) + "@" + address).toString(); + return stringJoiner.add(namespace).add(stringifyId(id)).toString(); } else { - return stringJoiner.add(namespace).add(alias).add(stringifyId(id) + "@" + address).toString(); + return stringJoiner.add(namespace).add(alias).add(stringifyId(id)).toString(); } } } diff --git a/cluster-testlib/src/main/java/io/scalecube/cluster/utils/NetworkEmulator.java b/cluster-testlib/src/main/java/io/scalecube/cluster/utils/NetworkEmulator.java index d51a7526..2239e8f9 100644 --- a/cluster-testlib/src/main/java/io/scalecube/cluster/utils/NetworkEmulator.java +++ b/cluster-testlib/src/main/java/io/scalecube/cluster/utils/NetworkEmulator.java @@ -1,10 +1,12 @@ package io.scalecube.cluster.utils; +import io.scalecube.cluster.Member; import io.scalecube.cluster.transport.api.Message; import io.scalecube.net.Address; import java.time.Duration; import java.util.Arrays; import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.StringJoiner; import java.util.concurrent.ConcurrentHashMap; @@ -213,15 +215,44 @@ public InboundSettings inboundSettings(Address destination) { return inboundSettings.getOrDefault(destination, defaultInboundSettings); } + /** + * Returns network inbound settings applied to the given destination. + * + * @param member target member + * @return network inbound settings + */ + public InboundSettings inboundSettings(Member member) { + final List
destinations = member.addresses(); + + if (destinations.isEmpty()) { + return defaultInboundSettings; + } + + for (Address destination : destinations) { + InboundSettings inboundSettings = this.inboundSettings.get(destination); + + if (inboundSettings != null) { + return inboundSettings; + } + } + + return defaultInboundSettings; + } + /** * Setter for network emulator inbound settings for specific destination. * * @param shallPass shallPass inbound flag */ - public void inboundSettings(Address destination, boolean shallPass) { + public void inboundSettings(Member member, boolean shallPass) { + final List
destinations = member.addresses(); InboundSettings settings = new InboundSettings(shallPass); - inboundSettings.put(destination, settings); - LOGGER.debug("[{}] Set inbound settings {} to {}", address, settings, destination); + + destinations.forEach( + destination -> { + inboundSettings.put(destination, settings); + LOGGER.debug("[{}] Set inbound settings {} to {}", address, settings, destination); + }); } /** diff --git a/cluster-testlib/src/main/java/io/scalecube/cluster/utils/NetworkEmulatorTransport.java b/cluster-testlib/src/main/java/io/scalecube/cluster/utils/NetworkEmulatorTransport.java index 381042c5..c47ebf34 100644 --- a/cluster-testlib/src/main/java/io/scalecube/cluster/utils/NetworkEmulatorTransport.java +++ b/cluster-testlib/src/main/java/io/scalecube/cluster/utils/NetworkEmulatorTransport.java @@ -1,5 +1,6 @@ package io.scalecube.cluster.utils; +import io.scalecube.cluster.Member; import io.scalecube.cluster.transport.api.Message; import io.scalecube.cluster.transport.api.Transport; import io.scalecube.net.Address; @@ -49,7 +50,7 @@ public boolean isStopped() { public Mono send(Address address, Message message) { return Mono.defer( () -> - Mono.just(enhanceWithSender(message)) + Mono.just(Message.with(message).build()) .flatMap(msg -> networkEmulator.tryFailOutbound(msg, address)) .flatMap(msg -> networkEmulator.tryDelayOutbound(msg, address)) .flatMap(msg -> transport.send(address, msg))); @@ -59,7 +60,7 @@ public Mono send(Address address, Message message) { public Mono requestResponse(Address address, Message request) { return Mono.defer( () -> - Mono.just(enhanceWithSender(request)) + Mono.just(Message.with(request).build()) .flatMap(msg -> networkEmulator.tryFailOutbound(msg, address)) .flatMap(msg -> networkEmulator.tryDelayOutbound(msg, address)) .flatMap( @@ -69,7 +70,9 @@ public Mono requestResponse(Address address, Message request) { .flatMap( message -> { boolean shallPass = - networkEmulator.inboundSettings(message.sender()).shallPass(); + networkEmulator + .inboundSettings((Member) message.sender()) + .shallPass(); return shallPass ? Mono.just(message) : Mono.never(); }))); } @@ -78,11 +81,7 @@ public Mono requestResponse(Address address, Message request) { public Flux listen() { return transport .listen() - .filter(message -> networkEmulator.inboundSettings(message.sender()).shallPass()) + .filter(message -> networkEmulator.inboundSettings((Member) message.sender()).shallPass()) .onBackpressureBuffer(); } - - private Message enhanceWithSender(Message message) { - return Message.with(message).sender(transport.address()).build(); - } } diff --git a/cluster-tests/pom.xml b/cluster-tests/pom.xml new file mode 100644 index 00000000..80d641d5 --- /dev/null +++ b/cluster-tests/pom.xml @@ -0,0 +1,41 @@ + + + 4.0.0 + + + io.scalecube + scalecube-cluster-parent + 2.6.18-SNAPSHOT + + + cluster-tests + + + + ${project.groupId} + scalecube-cluster + ${project.version} + + + ${project.groupId} + scalecube-cluster-testlib + ${project.version} + + + ${project.groupId} + scalecube-transport-netty + ${project.version} + + + org.apache.logging.log4j + log4j-slf4j-impl + + + org.apache.logging.log4j + log4j-core + + + + diff --git a/cluster-tests/src/test/java/io/scalecube/transport/netty/TcpTransportTest.java b/cluster-tests/src/test/java/io/scalecube/transport/netty/TcpTransportTest.java new file mode 100644 index 00000000..4a21f74b --- /dev/null +++ b/cluster-tests/src/test/java/io/scalecube/transport/netty/TcpTransportTest.java @@ -0,0 +1,101 @@ +//package io.scalecube.transport.netty.tcp; +// +//import static org.junit.jupiter.api.Assertions.assertEquals; +//import static org.junit.jupiter.api.Assertions.assertNotNull; +//import static org.junit.jupiter.api.Assertions.assertNull; +//import static org.junit.jupiter.api.Assertions.assertTrue; +//import static org.junit.jupiter.api.Assertions.fail; +// +//import io.scalecube.cluster.Member; +//import io.scalecube.cluster.TransportWrapper; +//import io.scalecube.cluster.transport.api.Message; +//import io.scalecube.cluster.utils.NetworkEmulatorTransport; +//import io.scalecube.net.Address; +//import io.scalecube.transport.netty.BaseTest; +//import java.io.IOException; +//import java.net.UnknownHostException; +//import java.time.Duration; +//import java.util.ArrayList; +//import java.util.List; +//import java.util.concurrent.CompletableFuture; +//import java.util.concurrent.TimeUnit; +//import java.util.concurrent.TimeoutException; +//import org.junit.jupiter.api.AfterEach; +//import org.junit.jupiter.api.Test; +//import org.junit.jupiter.api.TestInfo; +//import reactor.core.publisher.Flux; +//import reactor.core.publisher.Sinks; +//import reactor.test.StepVerifier; +// +//public class TcpTransportTest extends BaseTest { +// +// public static final Duration TIMEOUT = Duration.ofSeconds(10); +// +// // Auto-destroyed on tear down +// private NetworkEmulatorTransport client; +// private NetworkEmulatorTransport server; +// +// /** Tear down. */ +// @AfterEach +// public final void tearDown() { +// destroyTransport(client); +// destroyTransport(server); +// } +// +// @Test +// public void testNetworkSettings() { +// client = createTcpTransport(); +// server = createTcpTransport(); +// +// int lostPercent = 50; +// int mean = 0; +// final Address address = server.address(); +// client.networkEmulator().outboundSettings(address, lostPercent, mean); +// +// final List serverMessageList = new ArrayList<>(); +// server.listen().subscribe(serverMessageList::add); +// +// int total = 1000; +// Flux.range(0, total) +// .flatMap( +// i -> +// client.send( +// address, Message.builder().sender(createMember(address)).data("q" + i).build())) +// .onErrorContinue((th, o) -> {}) +// .blockLast(TIMEOUT); +// +// int expectedMax = +// total / 100 * lostPercent + total / 100 * 5; // +5% for maximum possible lost messages +// int size = serverMessageList.size(); +// assertTrue(size < expectedMax, "expectedMax=" + expectedMax + ", actual size=" + size); +// } +// +// @Test +// public void testBlockAndUnblockTraffic() throws Exception { +// client = createTcpTransport(); +// server = createTcpTransport(); +// +// server +// .listen() +// .subscribe(message -> TransportWrapper.send(server, message.sender(), message).subscribe()); +// +// Sinks.Many responses = Sinks.many().replay().all(); +// client +// .listen() +// .subscribe(responses::tryEmitNext, responses::tryEmitError, responses::tryEmitComplete); +// +// // test at unblocked transport +// send(client, server.address(), Message.fromQualifier("q/unblocked")).subscribe(); +// +// // then block client->server messages +// Thread.sleep(1000); +// client.networkEmulator().blockOutbound(server.address()); +// send(client, server.address(), Message.fromQualifier("q/blocked")).subscribe(); +// +// StepVerifier.create(responses.asFlux()) +// .assertNext(message -> assertEquals("q/unblocked", message.qualifier())) +// .expectNoEvent(Duration.ofMillis(300)) +// .thenCancel() +// .verify(TIMEOUT); +// } +//} diff --git a/cluster-tests/src/test/java/io/scalecube/transport/netty/TransportTests.java b/cluster-tests/src/test/java/io/scalecube/transport/netty/TransportTests.java new file mode 100644 index 00000000..065894a0 --- /dev/null +++ b/cluster-tests/src/test/java/io/scalecube/transport/netty/TransportTests.java @@ -0,0 +1,375 @@ +package io.scalecube.transport.netty; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +import io.scalecube.cluster.Member; +import io.scalecube.cluster.TransportWrapper; +import io.scalecube.cluster.transport.api.Message; +import io.scalecube.cluster.transport.api.Transport; +import io.scalecube.cluster.transport.api.TransportConfig; +import io.scalecube.cluster.transport.api.TransportFactory; +import io.scalecube.net.Address; +import io.scalecube.transport.netty.tcp.TcpTransportFactory; +import io.scalecube.transport.netty.websocket.WebsocketTransportFactory; +import java.io.IOException; +import java.net.UnknownHostException; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.StringJoiner; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Stream; +import java.util.stream.Stream.Builder; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +public class TransportTests { + + public static final Logger LOGGER = LoggerFactory.getLogger(TransportTests.class); + + public static final TransportConfig TRANSPORT_CONFIG = TransportConfig.defaultConfig(); + public static final String NS = "namespace"; + public static final Duration TIMEOUT = Duration.ofSeconds(3); + + private Context context; + + @BeforeEach + void beforeEach(TestInfo testInfo) { + LOGGER.info("***** Test started - " + testInfo.getDisplayName() + " *****"); + } + + @AfterEach + void afterEach(TestInfo testInfo) { + if (context != null) { + context.close(); + } + + LOGGER.info("***** Test finished - " + testInfo.getDisplayName() + " *****"); + } + + @ParameterizedTest + @MethodSource("transportContexts") + public void testUnresolvedHostConnection(Context context) { + this.context = context; + + try { + Address address = Address.from("wronghost:49255"); + Message message = Message.builder().sender(createMember(address)).data("q").build(); + context.sender.send(address, message).block(Duration.ofSeconds(20)); + fail("fail"); + } catch (Exception e) { + assertEquals( + UnknownHostException.class, e.getCause().getClass(), "Unexpected exception class"); + } + } + + @ParameterizedTest + @MethodSource("transportContexts") + public void testInteractWithNoConnection(Context context) { + this.context = context; + + Address serverAddress = Address.from("localhost:49255"); + Member sender = createMember(serverAddress); + + for (int i = 0; i < 10; i++) { + Transport transport = context.createTransport(); + + // create transport and don't wait just send message + try { + Message msg = Message.builder().sender(sender).data("q").build(); + transport.send(serverAddress, msg).block(TIMEOUT); + fail("fail"); + } catch (Exception e) { + assertTrue(e.getCause() instanceof IOException, "Unexpected exception type: " + e); + } + + // send second message: no connection yet and it's clear that there's no connection + try { + Message msg = Message.builder().sender(sender).data("q").build(); + transport.send(serverAddress, msg).block(TIMEOUT); + fail("fail"); + } catch (Exception e) { + assertTrue(e.getCause() instanceof IOException, "Unexpected exception type: " + e); + } + + transport.stop().block(TIMEOUT); + } + } + + @ParameterizedTest + @MethodSource("transportContexts") + public void testConnect(Context context) { + this.context = context; + + // Create client and try send msg to not-yet existing server + + final int serverPort = 4801; + final Transport client = context.createTransport(); + final Member clientMember = + new Member("client", null, Collections.singletonList(client.address()), NS); + final Member serverMember = + new Member( + "server", null, Collections.singletonList(Address.create("localhost", serverPort)), NS); + + // Verify send-error on client + + StepVerifier.create( + new TransportWrapper(client) + .send(serverMember, Message.builder().sender(clientMember).data("Hola!").build()) + .retry(2)) + .verifyError(); + + // Start server + + final Transport server = + Transport.bindAwait( + TransportConfig.defaultConfig() + .port(serverPort) + .transportFactory(context.transportFactory)); + + // Verify send-success on client + + StepVerifier.create( + new TransportWrapper(client) + .send(serverMember, Message.builder().sender(clientMember).data("Hola!").build()) + .repeat(2)) + .verifyComplete(); + + client.stop().block(TIMEOUT); + server.stop().block(TIMEOUT); + } + + @ParameterizedTest + @MethodSource("transportContexts") + public void testPingPong(Context context) { + this.context = context; + + final Transport receiver = context.receiver; + final Member senderMember = context.senderMember; + final Member receiverMember = context.receiverMember; + + receiver + .listen() + .subscribe( + message -> { + final Member sender = message.sender(); + final List
addresses = sender.addresses(); + + assertEquals(senderMember, sender, "sender"); + + receiver + .send( + addresses.get(0), + Message.builder().sender(receiverMember).data("hi client").build()) + .subscribe(); + }); + + final Transport sender = context.sender; + CompletableFuture messageFuture = new CompletableFuture<>(); + sender.listen().subscribe(messageFuture::complete); + + sender + .send( + receiver.address(), Message.builder().sender(senderMember).data("hello server").build()) + .subscribe(); + + Message result = Mono.fromFuture(messageFuture).block(TIMEOUT); + assertNotNull(result, "No response from serverAddress"); + assertEquals("hi client", result.data()); + assertEquals(receiverMember, result.sender(), "sender"); + } + + @ParameterizedTest + @MethodSource("transportContexts") + public void testPingPongWithTransportWrapper(Context context) { + this.context = context; + + final Transport receiver = context.receiver; + final Member senderMember = context.senderMember; + final TransportWrapper receiverWrapper = context.receiverWrapper; + final Member receiverMember = context.receiverMember; + + receiver + .listen() + .subscribe( + message -> { + assertEquals(senderMember, message.sender(), "sender"); + + receiverWrapper + .send( + message.sender(), + Message.builder().sender(receiverMember).data("hi client").build()) + .subscribe(); + }); + + CompletableFuture messageFuture = new CompletableFuture<>(); + context.sender.listen().subscribe(messageFuture::complete); + + final Message ping = Message.builder().sender(senderMember).data("hello server").build(); + context.senderWrapper.send(receiverMember, ping).subscribe(); + + Message result = Mono.fromFuture(messageFuture).block(TIMEOUT); + assertNotNull(result, "No response from serverAddress"); + assertEquals("hi client", result.data()); + assertEquals(receiverMember, result.sender(), "sender"); + } + + @ParameterizedTest + @MethodSource("transportContexts") + public void testRequestResponse(Context context) { + this.context = context; + + final Transport receiver = context.receiver; + final Member senderMember = context.senderMember; + final Member receiverMember = context.receiverMember; + + receiver + .listen() + .filter(req -> req.qualifier().equals("hello/server")) + .subscribe( + message -> { + final Member sender = message.sender(); + final List
addresses = sender.addresses(); + + assertEquals(senderMember, sender, "sender"); + + receiver + .send( + addresses.get(0), + Message.builder() + .correlationId(message.correlationId()) + .sender(receiverMember) + .data("hello: " + message.data()) + .build()) + .subscribe(); + }); + + Message result = + context + .sender + .requestResponse( + receiver.address(), + Message.builder() + .qualifier("hello/server") + .correlationId("" + System.nanoTime()) + .sender(senderMember) + .data("server") + .build()) + .block(TIMEOUT); + + //noinspection ConstantConditions + assertEquals("hello: server", result.data().toString(), "data"); + assertEquals(receiverMember, result.sender(), "sender"); + } + + @ParameterizedTest + @MethodSource("transportContexts") + public void testRequestResponseWithTransportWrapper(Context context) { + this.context = context; + + final Transport receiver = context.receiver; + final Member senderMember = context.senderMember; + final TransportWrapper receiverWrapper = context.receiverWrapper; + final Member receiverMember = context.receiverMember; + + receiver + .listen() + .filter(req -> req.qualifier().equals("hello/server")) + .subscribe( + message -> { + assertEquals(senderMember, message.sender(), "sender"); + + receiverWrapper + .send( + message.sender(), + Message.builder() + .correlationId(message.correlationId()) + .sender(receiverMember) + .data("hello: " + message.data()) + .build()) + .subscribe(); + }); + + Message result = + context + .sender + .requestResponse( + receiver.address(), + Message.builder() + .qualifier("hello/server") + .correlationId("" + System.nanoTime()) + .sender(senderMember) + .data("server") + .build()) + .block(TIMEOUT); + + //noinspection ConstantConditions + assertEquals("hello: server", result.data().toString(), "data"); + assertEquals(receiverMember, result.sender(), "sender"); + } + + private static Stream transportContexts() { + final Builder builder = Stream.builder(); + + builder.add(Arguments.of(new Context(new TcpTransportFactory()))); + builder.add(Arguments.of(new Context(new WebsocketTransportFactory()))); + + return builder.build(); + } + + private static Member createMember(Address address) { + return new Member("0", null, address, "NAMESPACE"); + } + + private static class Context implements AutoCloseable { + + private final Transport receiver; + private final Transport sender; + private final Member receiverMember; + private final Member senderMember; + private final TransportFactory transportFactory; + private final TransportWrapper receiverWrapper; + private final TransportWrapper senderWrapper; + + public Context(TransportFactory transportFactory) { + this.transportFactory = transportFactory; + receiver = Transport.bindAwait(TRANSPORT_CONFIG.transportFactory(transportFactory)); + sender = Transport.bindAwait(TRANSPORT_CONFIG.transportFactory(transportFactory)); + + receiverMember = + new Member("receiver", null, Collections.singletonList(receiver.address()), NS); + senderMember = new Member("sender", null, Collections.singletonList(sender.address()), NS); + + receiverWrapper = new TransportWrapper(receiver); + senderWrapper = new TransportWrapper(sender); + } + + private Transport createTransport() { + return Transport.bindAwait(TRANSPORT_CONFIG.transportFactory(transportFactory)); + } + + @Override + public void close() { + receiver.stop().block(TIMEOUT); + sender.stop().block(TIMEOUT); + } + + @Override + public String toString() { + return new StringJoiner(", ", Context.class.getSimpleName() + "[", "]") + .add("transportFactory=" + transportFactory.getClass().getSimpleName()) + .toString(); + } + } +} diff --git a/transport-parent/transport-netty/src/test/resources/log4j2-test.xml b/cluster-tests/src/test/resources/log4j2-test.xml similarity index 100% rename from transport-parent/transport-netty/src/test/resources/log4j2-test.xml rename to cluster-tests/src/test/resources/log4j2-test.xml diff --git a/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java b/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java index 4dee1227..bac1870d 100644 --- a/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java @@ -20,8 +20,10 @@ import io.scalecube.utils.ServiceLoaderUtil; import java.io.Serializable; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.Objects; import java.util.Optional; import java.util.Set; @@ -241,9 +243,11 @@ private Mono doStart0() { .flatMap( boundTransport -> { localMember = createLocalMember(boundTransport.address()); - transport = new SenderAwareTransport(boundTransport, localMember.address()); + transport = boundTransport; - scheduler = Schedulers.newSingle("sc-cluster-" + localMember.address().port(), true); + final String name = + "sc-cluster-" + Integer.toHexString(System.identityHashCode(this)); + scheduler = Schedulers.newSingle(name, true); failureDetector = new FailureDetectorImpl( @@ -369,24 +373,35 @@ private Flux listenMembership() { * @return local cluster member with cluster address and cluster member id */ private Member createLocalMember(Address address) { - int port = Optional.ofNullable(config.externalPort()).orElse(address.port()); + final int port = address.port(); + final List
memberAddresses = new ArrayList<>(); - // calculate local member cluster address - Address memberAddress = - Optional.ofNullable(config.externalHost()) - .map(host -> Address.create(host, port)) - .orElseGet(() -> Address.create(address.host(), port)); + if (config.transportConfig().exposeAddress()) { + memberAddresses.add(address); + } + + final List externalHosts = config.externalHosts(); + if (externalHosts != null) { + for (String externalHost : externalHosts) { + memberAddresses.add(Address.create(externalHost, port)); + } + } + + if (memberAddresses.isEmpty()) { + throw new IllegalArgumentException("Member addresses must not be empty"); + } + + final String memberId = + config.memberId() != null ? config.memberId() : UUID.randomUUID().toString(); + final String memberAlias = config.memberAlias(); + final String namespace = config.membershipConfig().namespace(); - return new Member( - config.memberId() != null ? config.memberId() : UUID.randomUUID().toString(), - config.memberAlias(), - memberAddress, - config.membershipConfig().namespace()); + return new Member(memberId, memberAlias, memberAddresses, namespace); } @Override - public Address address() { - return member().address(); + public List
addresses() { + return member().addresses(); } @Override @@ -491,54 +506,4 @@ private Mono dispose() { public Mono onShutdown() { return onShutdown.asMono(); } - - private static class SenderAwareTransport implements Transport { - - private final Transport transport; - private final Address address; - - private SenderAwareTransport(Transport transport, Address address) { - this.transport = Objects.requireNonNull(transport); - this.address = Objects.requireNonNull(address); - } - - @Override - public Address address() { - return transport.address(); - } - - @Override - public Mono start() { - return transport.start(); - } - - @Override - public Mono stop() { - return transport.stop(); - } - - @Override - public boolean isStopped() { - return transport.isStopped(); - } - - @Override - public Mono send(Address address, Message message) { - return Mono.defer(() -> transport.send(address, enhanceWithSender(message))); - } - - @Override - public Mono requestResponse(Address address, Message request) { - return Mono.defer(() -> transport.requestResponse(address, enhanceWithSender(request))); - } - - @Override - public Flux listen() { - return transport.listen(); - } - - private Message enhanceWithSender(Message message) { - return Message.with(message).sender(address).build(); - } - } } diff --git a/cluster/src/main/java/io/scalecube/cluster/TransportWrapper.java b/cluster/src/main/java/io/scalecube/cluster/TransportWrapper.java new file mode 100644 index 00000000..5e84f3b8 --- /dev/null +++ b/cluster/src/main/java/io/scalecube/cluster/TransportWrapper.java @@ -0,0 +1,66 @@ +package io.scalecube.cluster; + +import io.scalecube.cluster.transport.api.Message; +import io.scalecube.cluster.transport.api.Transport; +import io.scalecube.net.Address; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiFunction; +import reactor.core.publisher.Mono; + +public class TransportWrapper { + + private final Transport transport; + + private final Map addressIndexByMember = new ConcurrentHashMap<>(); + + public TransportWrapper(Transport transport) { + this.transport = transport; + } + + /** + * Execute request response call. + * + * @param member member + * @param request request + * @return mono result + */ + public Mono requestResponse(Member member, Message request) { + return invokeWithRetry(member, request, transport::requestResponse); + } + + /** + * Execute send call. + * + * @param member member + * @param request request + * @return mono result + */ + public Mono send(Member member, Message request) { + return invokeWithRetry(member, request, transport::send); + } + + private Mono invokeWithRetry( + Member member, Message request, BiFunction> function) { + return Mono.defer( + () -> { + final List
addresses = member.addresses(); + final Integer index = addressIndexByMember.computeIfAbsent(member, m -> 0); + final AtomicInteger currentIndex = new AtomicInteger(index); + + return Mono.defer( + () -> { + if (currentIndex.get() == addresses.size()) { + currentIndex.set(0); + } + final Address address = addresses.get(currentIndex.get()); + return function.apply(address, request); + }) + .doOnSuccess(s -> addressIndexByMember.put(member, currentIndex.get())) + .doOnError(ex -> currentIndex.incrementAndGet()) + .retry(addresses.size() - 1); + }); + } +} diff --git a/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java b/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java index c7540bc0..ed9aa2cf 100644 --- a/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java @@ -3,6 +3,7 @@ import static io.scalecube.reactor.RetryNonSerializedEmitFailureHandler.RETRY_NON_SERIALIZED; import io.scalecube.cluster.Member; +import io.scalecube.cluster.TransportWrapper; import io.scalecube.cluster.fdetector.PingData.AckType; import io.scalecube.cluster.membership.MemberStatus; import io.scalecube.cluster.membership.MembershipEvent; @@ -26,6 +27,7 @@ import reactor.core.publisher.Sinks; import reactor.core.scheduler.Scheduler; +@SuppressWarnings({"FieldCanBeLocal", "unused"}) public final class FailureDetectorImpl implements FailureDetector { private static final Logger LOGGER = LoggerFactory.getLogger(FailureDetector.class); @@ -41,6 +43,7 @@ public final class FailureDetectorImpl implements FailureDetector { private final Member localMember; private final Transport transport; private final FailureDetectorConfig config; + private final TransportWrapper transportWrapper; // State @@ -80,6 +83,7 @@ public FailureDetectorImpl( this.transport = Objects.requireNonNull(transport); this.config = Objects.requireNonNull(config); this.scheduler = Objects.requireNonNull(scheduler); + transportWrapper = new TransportWrapper(transport); // Subscribe actionsDisposables.addAll( @@ -142,12 +146,18 @@ private void doPing() { // Send ping String cid = UUID.randomUUID().toString(); PingData pingData = new PingData(localMember, pingMember); - Message pingMsg = Message.withData(pingData).qualifier(PING).correlationId(cid).build(); + Message pingMsg = + Message.builder() + .sender(localMember) + .data(pingData) + .qualifier(PING) + .correlationId(cid) + .build(); LOGGER.debug("[{}][{}] Send Ping to {}", localMember, period, pingMember); - Address address = pingMember.address(); - transport - .requestResponse(address, pingMsg) + + transportWrapper + .requestResponse(pingMember, pingMsg) .timeout(Duration.ofMillis(config.pingTimeout()), scheduler) .publishOn(scheduler) .subscribe( @@ -179,7 +189,9 @@ private void doPing() { private void doPingReq( long period, final Member pingMember, final List pingReqMembers, String cid) { Message pingReqMsg = - Message.withData(new PingData(localMember, pingMember)) + Message.builder() + .sender(localMember) + .data(new PingData(localMember, pingMember)) .qualifier(PING_REQ) .correlationId(cid) .build(); @@ -189,8 +201,8 @@ private void doPingReq( Duration timeout = Duration.ofMillis(config.pingInterval() - config.pingTimeout()); pingReqMembers.forEach( member -> - transport - .requestResponse(member.address(), pingReqMsg) + transportWrapper + .requestResponse(member, pingReqMsg) .timeout(timeout, scheduler) .publishOn(scheduler) .subscribe( @@ -232,27 +244,42 @@ private void onMessage(Message message) { /** Listens to PING message and answers with ACK. */ private void onPing(Message message) { long period = this.currentPeriod; - Address sender = message.sender(); - LOGGER.debug("[{}][{}] Received Ping from {}", localMember, period, sender); + + LOGGER.debug("[{}][{}] Received Ping from {}", localMember, period, message.sender()); + PingData data = message.data(); + final Member dataTo = data.getTo(); + final Member dataFrom = data.getFrom(); + data = data.withAckType(AckType.DEST_OK); - if (!data.getTo().id().equals(localMember.id())) { + + if (!dataTo.id().equals(localMember.id())) { LOGGER.debug( "[{}][{}] Received Ping from {} to {}, but local member is {}", localMember, period, - sender, - data.getTo(), + message.sender(), + dataTo, localMember); data = data.withAckType(AckType.DEST_GONE); } + String correlationId = message.correlationId(); + Message ackMessage = - Message.withData(data).qualifier(PING_ACK).correlationId(correlationId).build(); - Address address = data.getFrom().address(); - LOGGER.debug("[{}][{}] Send PingAck to {}", localMember, period, address); - transport - .send(address, ackMessage) + Message.builder() + .sender(localMember) + .data(data) + .qualifier(PING_ACK) + .correlationId(correlationId) + .build(); + + List
addresses = dataFrom.addresses(); + + LOGGER.debug("[{}][{}] Send PingAck to {}", localMember, period, addresses); + + transportWrapper + .send(dataFrom, ackMessage) .subscribe( null, ex -> @@ -260,7 +287,7 @@ private void onPing(Message message) { "[{}][{}] Failed to send PingAck to {}, cause: {}", localMember, period, - address, + addresses, ex.toString())); } @@ -274,11 +301,17 @@ private void onPingReq(Message message) { String correlationId = message.correlationId(); PingData pingReqData = new PingData(localMember, target, originalIssuer); Message pingMessage = - Message.withData(pingReqData).qualifier(PING).correlationId(correlationId).build(); - Address address = target.address(); - LOGGER.debug("[{}][{}] Send transit Ping to {}", localMember, period, address); - transport - .send(address, pingMessage) + Message.builder() + .sender(localMember) + .data(pingReqData) + .qualifier(PING) + .correlationId(correlationId) + .build(); + + LOGGER.debug("[{}][{}] Send transit Ping to {}", localMember, period, target.addresses()); + + transportWrapper + .send(target, pingMessage) .subscribe( null, ex -> @@ -286,7 +319,7 @@ private void onPingReq(Message message) { "[{}][{}] Failed to send transit Ping to {}, cause: {}", localMember, period, - address, + target.addresses(), ex.toString())); } @@ -304,11 +337,17 @@ private void onTransitPingAck(Message message) { String correlationId = message.correlationId(); PingData originalAckData = new PingData(target, data.getTo()).withAckType(ackType); Message originalAckMessage = - Message.withData(originalAckData).qualifier(PING_ACK).correlationId(correlationId).build(); - Address address = target.address(); - LOGGER.debug("[{}][{}] Resend transit PingAck to {}", localMember, period, address); - transport - .send(address, originalAckMessage) + Message.builder() + .sender(localMember) + .data(originalAckData) + .qualifier(PING_ACK) + .correlationId(correlationId) + .build(); + + LOGGER.debug("[{}][{}] Resend transit PingAck to {}", localMember, period, target.addresses()); + + transportWrapper + .send(target, originalAckMessage) .subscribe( null, ex -> @@ -316,7 +355,7 @@ private void onTransitPingAck(Message message) { "[{}][{}] Failed to resend transit PingAck to {}, cause: {}", localMember, period, - address, + target.addresses(), ex.toString())); } diff --git a/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java b/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java index 1acfd060..9c9eacaa 100644 --- a/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java @@ -4,6 +4,7 @@ import io.scalecube.cluster.ClusterMath; import io.scalecube.cluster.Member; +import io.scalecube.cluster.TransportWrapper; import io.scalecube.cluster.membership.MembershipEvent; import io.scalecube.cluster.transport.api.Message; import io.scalecube.cluster.transport.api.Transport; @@ -29,6 +30,7 @@ import reactor.core.publisher.Sinks; import reactor.core.scheduler.Scheduler; +@SuppressWarnings({"FieldCanBeLocal", "unused"}) public final class GossipProtocolImpl implements GossipProtocol { private static final Logger LOGGER = LoggerFactory.getLogger(GossipProtocol.class); @@ -42,6 +44,7 @@ public final class GossipProtocolImpl implements GossipProtocol { private final Member localMember; private final Transport transport; private final GossipConfig config; + private final TransportWrapper transportWrapper; // Local State @@ -86,6 +89,7 @@ public GossipProtocolImpl( this.config = Objects.requireNonNull(config); this.localMember = Objects.requireNonNull(localMember); this.scheduler = Objects.requireNonNull(scheduler); + transportWrapper = new TransportWrapper(transport); // Subscribe actionsDisposables.addAll( @@ -287,14 +291,14 @@ private void spreadGossipsTo(long period, Member member) { } // Send gossip request - Address address = member.address(); + List
addresses = member.addresses(); gossips.stream() .map(this::buildGossipRequestMessage) .forEach( message -> - transport - .send(address, message) + transportWrapper + .send(member, message) .subscribe( null, ex -> @@ -303,7 +307,7 @@ private void spreadGossipsTo(long period, Member member) { localMember, period, message, - address, + addresses, ex.toString()))); } @@ -342,8 +346,11 @@ private List selectGossipMembers() { } private Message buildGossipRequestMessage(Gossip gossip) { - GossipRequest gossipRequest = new GossipRequest(gossip, localMember.id()); - return Message.withData(gossipRequest).qualifier(GOSSIP_REQ).build(); + return Message.builder() + .sender(localMember) + .data(new GossipRequest(gossip, localMember.id())) + .qualifier(GOSSIP_REQ) + .build(); } private Set getGossipsToRemove(long period) { diff --git a/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java b/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java index fad3bca3..e7d8707a 100644 --- a/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java @@ -8,6 +8,7 @@ import io.scalecube.cluster.ClusterConfig; import io.scalecube.cluster.ClusterMath; import io.scalecube.cluster.Member; +import io.scalecube.cluster.TransportWrapper; import io.scalecube.cluster.fdetector.FailureDetector; import io.scalecube.cluster.fdetector.FailureDetectorConfig; import io.scalecube.cluster.fdetector.FailureDetectorEvent; @@ -36,8 +37,8 @@ import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; -import java.util.stream.Stream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.Disposable; @@ -78,6 +79,7 @@ private enum MembershipUpdateReason { private final FailureDetector failureDetector; private final GossipProtocol gossipProtocol; private final MetadataStore metadataStore; + private final TransportWrapper transportWrapper; // State @@ -124,6 +126,7 @@ public MembershipProtocolImpl( this.scheduler = Objects.requireNonNull(scheduler); this.membershipConfig = Objects.requireNonNull(config).membershipConfig(); this.failureDetectorConfig = Objects.requireNonNull(config).failureDetectorConfig(); + transportWrapper = new TransportWrapper(transport); // Prepare seeds seedMembers = cleanUpSeedMembers(membershipConfig.seedMembers()); @@ -166,24 +169,31 @@ private List
cleanUpSeedMembers(Collection
seedMembers) { String hostAddress = localIpAddress.getHostAddress(); String hostName = localIpAddress.getHostName(); - Address memberAddr = localMember.address(); Address transportAddr = transport.address(); - Address memberAddrByHostAddress = Address.create(hostAddress, memberAddr.port()); Address transportAddrByHostAddress = Address.create(hostAddress, transportAddr.port()); - Address memberAddByHostName = Address.create(hostName, memberAddr.port()); Address transportAddrByHostName = Address.create(hostName, transportAddr.port()); return new LinkedHashSet<>(seedMembers) .stream() - .filter(addr -> checkAddressesNotEqual(addr, memberAddr)) + .filter(addr -> checkAddressesNotEqual(addr, localMember, hostAddress, hostName)) .filter(addr -> checkAddressesNotEqual(addr, transportAddr)) - .filter(addr -> checkAddressesNotEqual(addr, memberAddrByHostAddress)) .filter(addr -> checkAddressesNotEqual(addr, transportAddrByHostAddress)) - .filter(addr -> checkAddressesNotEqual(addr, memberAddByHostName)) .filter(addr -> checkAddressesNotEqual(addr, transportAddrByHostName)) .collect(Collectors.toList()); } + private boolean checkAddressesNotEqual( + Address smAddress, Member localMember, String hostAddress, String hostName) { + return localMember.addresses().stream() + .allMatch( + memberAddress -> + checkAddressesNotEqual(smAddress, memberAddress) + && checkAddressesNotEqual( + smAddress, Address.create(hostAddress, memberAddress.port())) + && checkAddressesNotEqual( + smAddress, Address.create(hostName, memberAddress.port()))); + } + private boolean checkAddressesNotEqual(Address address0, Address address1) { if (!address0.equals(address1)) { return true; @@ -326,27 +336,28 @@ public Optional member(String id) { @Override public Optional member(Address address) { - return new ArrayList<>(members.values()) - .stream().filter(member -> member.address().equals(address)).findFirst(); + return members.values().stream() + .filter(member -> member.addresses().stream().anyMatch(address::equals)) + .findFirst(); } private void doSync() { - Address address = selectSyncAddress().orElse(null); - if (address == null) { + List
addresses = selectSyncAddress(); + + if (addresses.isEmpty()) { return; } Message message = prepareSyncDataMsg(SYNC, null); - LOGGER.debug("[{}][doSync] Send Sync to {}", localMember, address); - transport - .send(address, message) + LOGGER.debug("[{}][doSync] Send Sync to {}", localMember, addresses); + send(transport, addresses, message) .subscribe( null, ex -> LOGGER.debug( "[{}][doSync] Failed to send Sync to {}, cause: {}", localMember, - address, + addresses, ex.toString())); } @@ -390,13 +401,13 @@ private Mono onSyncAck(Message syncAckMsg, boolean onStart) { private Mono onSync(Message syncMsg) { return Mono.defer( () -> { - final Address sender = syncMsg.sender(); + final Member sender = syncMsg.sender(); LOGGER.debug("[{}] Received Sync from {}", localMember, sender); return syncMembership(syncMsg.data(), false) .doOnSuccess( avoid -> { - Message message = prepareSyncDataMsg(SYNC_ACK, syncMsg.correlationId()); - transport + final Message message = prepareSyncDataMsg(SYNC_ACK, syncMsg.correlationId()); + transportWrapper .send(sender, message) .subscribe( null, @@ -412,7 +423,10 @@ private Mono onSync(Message syncMsg) { /** Merges FD updates and processes them. */ private void onFailureDetectorEvent(FailureDetectorEvent fdEvent) { - MembershipRecord r0 = membershipTable.get(fdEvent.member().id()); + final Member member = fdEvent.member(); + final List
addresses = member.addresses(); + + MembershipRecord r0 = membershipTable.get(member.id()); if (r0 == null) { // member already removed return; } @@ -425,16 +439,16 @@ private void onFailureDetectorEvent(FailureDetectorEvent fdEvent) { // Alive won't override SUSPECT so issue instead extra sync with member to force it spread // alive with inc + 1 Message syncMsg = prepareSyncDataMsg(SYNC, null); - Address address = fdEvent.member().address(); - transport - .send(address, syncMsg) + + transportWrapper + .send(member, syncMsg) .subscribe( null, ex -> LOGGER.debug( "[{}][onFailureDetectorEvent] Failed to send Sync to {}, cause: {}", localMember, - address, + addresses, ex.toString())); } else { MembershipRecord record = @@ -464,17 +478,24 @@ private void onMembershipGossip(Message message) { } } - private Optional
selectSyncAddress() { - List
addresses = - Stream.concat(seedMembers.stream(), otherMembers().stream().map(Member::address)) - .collect(Collectors.collectingAndThen(Collectors.toSet(), ArrayList::new)); - Collections.shuffle(addresses); - if (addresses.isEmpty()) { - return Optional.empty(); - } else { - int i = ThreadLocalRandom.current().nextInt(addresses.size()); - return Optional.of(addresses.get(i)); + private List
selectSyncAddress() { + Collection otherMembers = otherMembers(); + + if (seedMembers.isEmpty() && otherMembers.isEmpty()) { + return Collections.emptyList(); } + + int totalSize = seedMembers.size() + otherMembers.size(); + int randomIndex = ThreadLocalRandom.current().nextInt(totalSize); + + if (randomIndex < seedMembers.size()) { + return Collections.singletonList(seedMembers.get(randomIndex)); + } + + List otherMembersList = new ArrayList<>(otherMembers); + Member member = otherMembersList.get(randomIndex - seedMembers.size()); + + return member.addresses(); } // ================================================ @@ -489,9 +510,12 @@ private void schedulePeriodicSync() { } private Message prepareSyncDataMsg(String qualifier, String cid) { - List membershipRecords = new ArrayList<>(membershipTable.values()); - SyncData syncData = new SyncData(membershipRecords); - return Message.withData(syncData).qualifier(qualifier).correlationId(cid).build(); + return Message.builder() + .sender(localMember) + .data(new SyncData(new ArrayList<>(membershipTable.values()))) + .qualifier(qualifier) + .correlationId(cid) + .build(); } private Mono syncMembership(SyncData syncData, boolean onStart) { @@ -508,11 +532,11 @@ private Mono syncMembership(SyncData syncData, boolean onStart) { updateMembership(r1, reason) .doOnError( ex -> - LOGGER.warn( + LOGGER.error( "[{}][syncMembership][{}][error] cause: {}", localMember, reason, - ex.toString())) + ex)) .onErrorResume(ex -> Mono.empty())) .toArray(Mono[]::new); @@ -589,7 +613,7 @@ private Mono updateMembership(MembershipRecord r1, MembershipUpdateReason } // If received updated for local member then increase incarnation and spread Alive gossip - if (r1.member().address().equals(localMember.address())) { + if (r1.member().addresses().equals(localMember.addresses())) { if (r1.member().id().equals(localMember.id())) { return onSelfMemberDetected(r0, r1, reason); } else { @@ -843,13 +867,20 @@ private void spreadMembershipGossipUnlessGossiped( } } - private Mono spreadMembershipGossip(MembershipRecord r) { + private Mono spreadMembershipGossip(MembershipRecord record) { return Mono.defer( () -> { - Message msg = Message.withData(r).qualifier(MEMBERSHIP_GOSSIP).build(); - LOGGER.debug("[{}] Send membership with gossip", localMember); + final Message message = + Message.builder() + .sender(localMember) + .data(record) + .qualifier(MEMBERSHIP_GOSSIP) + .build(); + return gossipProtocol - .spread(msg) + .spread(message) + .doOnSubscribe( + subscription -> LOGGER.debug("[{}] Send membership with gossip", localMember)) .doOnError( ex -> LOGGER.debug( @@ -859,4 +890,15 @@ private Mono spreadMembershipGossip(MembershipRecord r) { .then(); }); } + + private static Mono send(Transport transport, List
addresses, Message request) { + final AtomicInteger currentIndex = new AtomicInteger(); + return Mono.defer( + () -> { + final Address address = addresses.get(currentIndex.get()); + return transport.send(address, request); + }) + .doOnError(ex -> currentIndex.incrementAndGet()) + .retry(addresses.size() - 1); + } } diff --git a/cluster/src/main/java/io/scalecube/cluster/metadata/MetadataStoreImpl.java b/cluster/src/main/java/io/scalecube/cluster/metadata/MetadataStoreImpl.java index 35ba5328..c2646477 100644 --- a/cluster/src/main/java/io/scalecube/cluster/metadata/MetadataStoreImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/metadata/MetadataStoreImpl.java @@ -2,9 +2,9 @@ import io.scalecube.cluster.ClusterConfig; import io.scalecube.cluster.Member; +import io.scalecube.cluster.TransportWrapper; import io.scalecube.cluster.transport.api.Message; import io.scalecube.cluster.transport.api.Transport; -import io.scalecube.net.Address; import java.nio.ByteBuffer; import java.time.Duration; import java.util.HashMap; @@ -36,6 +36,7 @@ public class MetadataStoreImpl implements MetadataStore { private final Member localMember; private final Transport transport; private final ClusterConfig config; + private final TransportWrapper transportWrapper; // State @@ -69,6 +70,7 @@ public MetadataStoreImpl( this.config = Objects.requireNonNull(config); this.scheduler = Objects.requireNonNull(scheduler); this.localMetadata = localMetadata; // optional + transportWrapper = new TransportWrapper(transport); } @Override @@ -148,7 +150,6 @@ public Mono fetchMetadata(Member member) { return Mono.defer( () -> { final String cid = UUID.randomUUID().toString(); - final Address targetAddress = member.address(); LOGGER.debug("[{}][{}] Getting metadata for member {}", localMember, cid, member); @@ -156,11 +157,12 @@ public Mono fetchMetadata(Member member) { Message.builder() .qualifier(GET_METADATA_REQ) .correlationId(cid) + .sender(localMember) .data(new GetMetadataRequest(member)) .build(); - return transport - .requestResponse(targetAddress, request) + return transportWrapper + .requestResponse(member, request) .timeout(Duration.ofMillis(config.metadataTimeout()), scheduler) .publishOn(scheduler) .doOnSuccess( @@ -169,7 +171,7 @@ public Mono fetchMetadata(Member member) { "[{}][{}] Received GetMetadataResp from {}", localMember, cid, - targetAddress)) + member.addresses())) .map(Message::data) .map(GetMetadataResponse::getMetadata) .doOnError( @@ -179,7 +181,7 @@ public Mono fetchMetadata(Member member) { + "from {} within {} ms, cause: {}", localMember, cid, - targetAddress, + member.addresses(), config.metadataTimeout(), th.toString())); }); @@ -196,7 +198,7 @@ private void onMessage(Message message) { } private void onMetadataRequest(Message message) { - final Address sender = message.sender(); + final Member sender = message.sender(); LOGGER.debug("[{}] Received GetMetadataReq from {}", localMember, sender); GetMetadataRequest reqData = message.data(); @@ -220,11 +222,13 @@ private void onMetadataRequest(Message message) { Message.builder() .qualifier(GET_METADATA_RESP) .correlationId(message.correlationId()) + .sender(localMember) .data(respData) .build(); LOGGER.debug("[{}] Send GetMetadataResp to {}", localMember, sender); - transport + + transportWrapper .send(sender, response) .subscribe( null, diff --git a/cluster/src/test/java/io/scalecube/cluster/ClusterNamespacesTest.java b/cluster/src/test/java/io/scalecube/cluster/ClusterNamespacesTest.java index bbcbd1ab..000dbedf 100644 --- a/cluster/src/test/java/io/scalecube/cluster/ClusterNamespacesTest.java +++ b/cluster/src/test/java/io/scalecube/cluster/ClusterNamespacesTest.java @@ -66,14 +66,14 @@ public void testSeparateEmptyNamespaces() { new ClusterImpl() .transportFactory(WebsocketTransportFactory::new) .membership(opts -> opts.namespace("root1")) - .membership(opts -> opts.seedMembers(root.address())) + .membership(opts -> opts.seedMembers(root.addresses())) .startAwait(); Cluster root2 = new ClusterImpl() .transportFactory(WebsocketTransportFactory::new) .membership(opts -> opts.namespace("root2")) - .membership(opts -> opts.seedMembers(root.address())) + .membership(opts -> opts.seedMembers(root.addresses())) .startAwait(); assertThat(root.otherMembers(), iterableWithSize(0)); @@ -93,21 +93,21 @@ public void testSeparateNonEmptyNamespaces() { new ClusterImpl() .transportFactory(WebsocketTransportFactory::new) .membership(opts -> opts.namespace("root")) - .membership(opts -> opts.seedMembers(root.address())) + .membership(opts -> opts.seedMembers(root.addresses())) .startAwait(); Cluster carol = new ClusterImpl() .transportFactory(WebsocketTransportFactory::new) .membership(opts -> opts.namespace("root")) - .membership(opts -> opts.seedMembers(root.address(), bob.address())) + .membership(opts -> opts.seedMembers(root.addresses().get(0), bob.addresses().get(0))) .startAwait(); Cluster root2 = new ClusterImpl() .transportFactory(WebsocketTransportFactory::new) .membership(opts -> opts.namespace("root2")) - .membership(opts -> opts.seedMembers(root.address())) + .membership(opts -> opts.seedMembers(root.addresses())) .startAwait(); Cluster dan = @@ -117,7 +117,10 @@ public void testSeparateNonEmptyNamespaces() { .membership( opts -> opts.seedMembers( - root.address(), root2.address(), bob.address(), carol.address())) + root.addresses().get(0), + root2.addresses().get(0), + bob.addresses().get(0), + carol.addresses().get(0))) .startAwait(); Cluster eve = @@ -127,11 +130,11 @@ public void testSeparateNonEmptyNamespaces() { .membership( opts -> opts.seedMembers( - root.address(), - root2.address(), - dan.address(), - bob.address(), - carol.address())) + root.addresses().get(0), + root2.addresses().get(0), + dan.addresses().get(0), + bob.addresses().get(0), + carol.addresses().get(0))) .startAwait(); assertThat(root.otherMembers(), containsInAnyOrder(bob.member(), carol.member())); @@ -155,14 +158,15 @@ public void testSimpleNamespacesHierarchy() { new ClusterImpl() .transportFactory(WebsocketTransportFactory::new) .membership(opts -> opts.namespace("develop/develop")) - .membership(opts -> opts.seedMembers(rootDevelop.address())) + .membership(opts -> opts.seedMembers(rootDevelop.addresses())) .startAwait(); Cluster carol = new ClusterImpl() .transportFactory(WebsocketTransportFactory::new) .membership(opts -> opts.namespace("develop/develop")) - .membership(opts -> opts.seedMembers(rootDevelop.address(), bob.address())) + .membership( + opts -> opts.seedMembers(rootDevelop.addresses().get(0), bob.addresses().get(0))) .startAwait(); Cluster dan = @@ -170,7 +174,11 @@ public void testSimpleNamespacesHierarchy() { .transportFactory(WebsocketTransportFactory::new) .membership(opts -> opts.namespace("develop/develop-2")) .membership( - opts -> opts.seedMembers(rootDevelop.address(), bob.address(), carol.address())) + opts -> + opts.seedMembers( + rootDevelop.addresses().get(0), + bob.addresses().get(0), + carol.addresses().get(0))) .startAwait(); Cluster eve = @@ -180,7 +188,10 @@ public void testSimpleNamespacesHierarchy() { .membership( opts -> opts.seedMembers( - rootDevelop.address(), bob.address(), carol.address(), dan.address())) + rootDevelop.addresses().get(0), + bob.addresses().get(0), + carol.addresses().get(0), + dan.addresses().get(0))) .startAwait(); assertThat( @@ -206,14 +217,15 @@ public void testIsolatedParentNamespaces() { new ClusterImpl() .transportFactory(WebsocketTransportFactory::new) .membership(opts -> opts.namespace("a/1/c")) - .membership(opts -> opts.seedMembers(parent1.address())) + .membership(opts -> opts.seedMembers(parent1.addresses())) .startAwait(); Cluster carol = new ClusterImpl() .transportFactory(WebsocketTransportFactory::new) .membership(opts -> opts.namespace("a/1/c")) - .membership(opts -> opts.seedMembers(parent1.address(), bob.address())) + .membership( + opts -> opts.seedMembers(parent1.addresses().get(0), bob.addresses().get(0))) .startAwait(); Cluster parent2 = @@ -229,7 +241,10 @@ public void testIsolatedParentNamespaces() { .membership( opts -> opts.seedMembers( - parent1.address(), parent2.address(), bob.address(), carol.address())) + parent1.addresses().get(0), + parent2.addresses().get(0), + bob.addresses().get(0), + carol.addresses().get(0))) .startAwait(); //noinspection unused @@ -240,11 +255,11 @@ public void testIsolatedParentNamespaces() { .membership( opts -> opts.seedMembers( - parent1.address(), - parent2.address(), - bob.address(), - carol.address(), - dan.address())) + parent1.addresses().get(0), + parent2.addresses().get(0), + bob.addresses().get(0), + carol.addresses().get(0), + dan.addresses().get(0))) .startAwait(); assertThat(parent1.otherMembers(), containsInAnyOrder(bob.member(), carol.member())); diff --git a/cluster/src/test/java/io/scalecube/cluster/ClusterTest.java b/cluster/src/test/java/io/scalecube/cluster/ClusterTest.java index dd104ade..89a9cbcd 100644 --- a/cluster/src/test/java/io/scalecube/cluster/ClusterTest.java +++ b/cluster/src/test/java/io/scalecube/cluster/ClusterTest.java @@ -92,7 +92,7 @@ public void testMembersAccessFromScheduler() { Cluster seedNode = new ClusterImpl().transportFactory(TcpTransportFactory::new).startAwait(); Cluster otherNode = new ClusterImpl() - .membership(opts -> opts.seedMembers(seedNode.address())) + .membership(opts -> opts.seedMembers(seedNode.addresses())) .transportFactory(TcpTransportFactory::new) .startAwait(); @@ -101,8 +101,8 @@ public void testMembersAccessFromScheduler() { // Members by address - Optional otherNodeOnSeedNode = seedNode.member(otherNode.address()); - Optional seedNodeOnOtherNode = otherNode.member(seedNode.address()); + Optional otherNodeOnSeedNode = seedNode.member(otherNode.addresses().get(0)); + Optional seedNodeOnOtherNode = otherNode.member(seedNode.addresses().get(0)); assertEquals(otherNode.member(), otherNodeOnSeedNode.orElse(null)); assertEquals(seedNode.member(), seedNodeOnOtherNode.orElse(null)); @@ -154,7 +154,7 @@ public void testJoinLocalhostIgnoredWithOverride() throws InterruptedException { // Start seed node Cluster seedNode = - new ClusterImpl(new ClusterConfig().externalHost("localhost").externalPort(7878)) + new ClusterImpl(new ClusterConfig().externalHosts("localhost")) .transport(opts -> opts.port(7878).connectTimeout(CONNECT_TIMEOUT)) .membership(opts -> opts.seedMembers(addresses)) .transportFactory(TcpTransportFactory::new) @@ -181,7 +181,7 @@ public void testJoinDynamicPort() { for (int i = 0; i < membersNum; i++) { otherNodes.add( new ClusterImpl() - .membership(opts -> opts.seedMembers(seedNode.address())) + .membership(opts -> opts.seedMembers(seedNode.addresses())) .transportFactory(TcpTransportFactory::new) .startAwait()); } @@ -212,7 +212,7 @@ public void testUpdateMetadata() throws Exception { metadataNode = new ClusterImpl() .config(opts -> opts.metadata(metadata)) - .membership(opts -> opts.seedMembers(seedNode.address())) + .membership(opts -> opts.seedMembers(seedNode.addresses())) .transportFactory(TcpTransportFactory::new) .startAwait(); @@ -221,7 +221,7 @@ public void testUpdateMetadata() throws Exception { .flatMap( integer -> new ClusterImpl() - .membership(opts -> opts.seedMembers(seedNode.address())) + .membership(opts -> opts.seedMembers(seedNode.addresses())) .transportFactory(TcpTransportFactory::new) .handler( cluster -> @@ -285,7 +285,7 @@ public void testUpdateMetadataProperty() throws Exception { metadataNode = new ClusterImpl() .config(opts -> opts.metadata(metadata)) - .membership(opts -> opts.seedMembers(seedNode.address())) + .membership(opts -> opts.seedMembers(seedNode.addresses())) .transportFactory(TcpTransportFactory::new) .startAwait(); @@ -294,7 +294,7 @@ public void testUpdateMetadataProperty() throws Exception { .flatMap( integer -> new ClusterImpl() - .membership(opts -> opts.seedMembers(seedNode.address())) + .membership(opts -> opts.seedMembers(seedNode.addresses())) .transportFactory(TcpTransportFactory::new) .handler( cluster -> @@ -363,7 +363,7 @@ public void testRemoveMetadataProperty() throws Exception { metadataNode = new ClusterImpl() .config(opts -> opts.metadata(metadata)) - .membership(opts -> opts.seedMembers(seedNode.address())) + .membership(opts -> opts.seedMembers(seedNode.addresses())) .transportFactory(TcpTransportFactory::new) .startAwait(); @@ -372,7 +372,7 @@ public void testRemoveMetadataProperty() throws Exception { .flatMap( integer -> new ClusterImpl() - .membership(opts -> opts.seedMembers(seedNode.address())) + .membership(opts -> opts.seedMembers(seedNode.addresses())) .transportFactory(TcpTransportFactory::new) .handler( cluster -> @@ -452,19 +452,19 @@ public void onMembershipEvent(MembershipEvent event) { // Start nodes final Cluster node1 = new ClusterImpl() - .membership(opts -> opts.seedMembers(seedNode.address())) + .membership(opts -> opts.seedMembers(seedNode.addresses())) .transportFactory(TcpTransportFactory::new) .handler(cluster -> listener) .startAwait(); final Cluster node2 = new ClusterImpl() - .membership(opts -> opts.seedMembers(seedNode.address())) + .membership(opts -> opts.seedMembers(seedNode.addresses())) .transportFactory(TcpTransportFactory::new) .handler(cluster -> listener) .startAwait(); final Cluster node3 = new ClusterImpl() - .membership(opts -> opts.seedMembers(seedNode.address())) + .membership(opts -> opts.seedMembers(seedNode.addresses())) .transportFactory(TcpTransportFactory::new) .handler(cluster -> listener) .startAwait(); @@ -506,7 +506,7 @@ public void onMembershipEvent(MembershipEvent event) { final Cluster node1 = new ClusterImpl() .config(opts -> opts.metadata(node1Metadata)) - .membership(opts -> opts.seedMembers(seedNode.address())) + .membership(opts -> opts.seedMembers(seedNode.addresses())) .transportFactory(TcpTransportFactory::new) .handler( cluster -> @@ -559,9 +559,11 @@ public void testJoinSeedClusterWithNoExistingSeedMember() { // Start seed node Cluster seedNode = new ClusterImpl().transportFactory(TcpTransportFactory::new).startAwait(); - Address nonExistingSeed1 = Address.from("localhost:1234"); - Address nonExistingSeed2 = Address.from("localhost:5678"); - Address[] seeds = new Address[] {nonExistingSeed1, nonExistingSeed2, seedNode.address()}; + List
seeds = new ArrayList<>(); + + seeds.add(Address.from("localhost:1234")); // Not existent + seeds.add(Address.from("localhost:5678")); // Not existent + seeds.addAll(seedNode.addresses()); Cluster otherNode = new ClusterImpl() @@ -590,14 +592,13 @@ private void shutdown(List nodes) { @Test public void testExplicitLocalMemberId() { - ClusterConfig config = ClusterConfig.defaultConfig() - .memberId("test-member"); + ClusterConfig config = ClusterConfig.defaultConfig().memberId("test-member"); ClusterImpl cluster = null; try { - cluster = (ClusterImpl) new ClusterImpl(config) - .transportFactory(TcpTransportFactory::new) - .startAwait(); + cluster = + (ClusterImpl) + new ClusterImpl(config).transportFactory(TcpTransportFactory::new).startAwait(); assertEquals("test-member", cluster.member().id()); } finally { diff --git a/cluster/src/test/java/io/scalecube/cluster/TransportWrapperTest.java b/cluster/src/test/java/io/scalecube/cluster/TransportWrapperTest.java new file mode 100644 index 00000000..d3d15ec3 --- /dev/null +++ b/cluster/src/test/java/io/scalecube/cluster/TransportWrapperTest.java @@ -0,0 +1,260 @@ +package io.scalecube.cluster; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import io.scalecube.cluster.transport.api.Message; +import io.scalecube.cluster.transport.api.Transport; +import io.scalecube.net.Address; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; +import java.util.stream.Stream.Builder; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +class TransportWrapperTest { + + private final Message request = + Message.builder() + .sender( + new Member( + "request", + null, + Collections.singletonList(Address.from("request:0")), + "namespace")) + .data("" + System.currentTimeMillis()) + .build(); + + private final Message response = + Message.builder() + .sender( + new Member( + "response", + null, + Collections.singletonList(Address.from("response:0")), + "namespace")) + .data("" + System.currentTimeMillis()) + .build(); + + private final Transport transport; + private final TransportWrapper transportWrapper; + + public TransportWrapperTest() { + transport = mock(Transport.class); + transportWrapper = new TransportWrapper(transport); + } + + static Stream methodSource() { + final Builder builder = Stream.builder(); + + // int size, int startIndex, int successIndex + + for (int size = 0; size < 5; size++) { + populateBuilder(builder, size); + } + + return builder.build(); + } + + static void populateBuilder(Builder builder, int size) { + // int startIndex, int successIndex + + for (int startIndex = 0; startIndex < size; startIndex++) { + for (int successIndex = 0; successIndex < size; successIndex++) { + builder.add(Arguments.of(size, startIndex, successIndex)); + } + } + } + + private Map addressIndexByMember() + throws NoSuchFieldException, IllegalAccessException { + final Field field = TransportWrapper.class.getDeclaredField("addressIndexByMember"); + field.setAccessible(true); + //noinspection unchecked + return (Map) field.get(transportWrapper); + } + + @ParameterizedTest + @MethodSource("methodSource") + void requestResponseShouldWorkByRoundRobin(int size, int startIndex, int successIndex) + throws Exception { + final List
addresses = new ArrayList<>(); + final Member member = new Member("test", null, addresses, "namespace"); + for (int i = 0; i < size; i++) { + addresses.add(Address.from("test:" + i)); + } + + if (startIndex > 0) { + addressIndexByMember().put(member, startIndex); + } + + for (int i = 0; i < size; i++) { + final Address address = addresses.get(i); + if (i == successIndex) { + when(transport.requestResponse(address, request)).thenReturn(Mono.just(response)); + } else { + when(transport.requestResponse(address, request)) + .thenReturn(Mono.error(new RuntimeException("Error - " + i))); + } + } + + StepVerifier.create(transportWrapper.requestResponse(member, request)) + .assertNext(message -> assertSame(response, message, "response")) + .thenCancel() + .verify(); + + assertEquals(successIndex, addressIndexByMember().get(member), "successIndex"); + } + + @Test + void requestResponseShouldWorkThenFail() { + final List
addresses = Collections.singletonList(Address.from("test:0")); + final Member member = new Member("test", null, addresses, "namespace"); + + when(transport.requestResponse(addresses.get(0), request)) + .thenReturn(Mono.just(response)) + .thenReturn(Mono.error(new RuntimeException("Error"))); + + StepVerifier.create(transportWrapper.requestResponse(member, request)) + .assertNext(message -> assertSame(response, message, "response")) + .thenCancel() + .verify(); + + StepVerifier.create(transportWrapper.requestResponse(member, request)) + .verifyErrorSatisfies(throwable -> assertEquals("Error", throwable.getMessage())); + } + + @Test + void requestResponseShouldFailThenWork() { + final List
addresses = Collections.singletonList(Address.from("test:0")); + final Member member = new Member("test", null, addresses, "namespace"); + + when(transport.requestResponse(addresses.get(0), request)) + .thenReturn(Mono.error(new RuntimeException("Error"))) + .thenReturn(Mono.just(response)); + + StepVerifier.create(transportWrapper.requestResponse(member, request)) + .verifyErrorSatisfies(throwable -> assertEquals("Error", throwable.getMessage())); + + StepVerifier.create(transportWrapper.requestResponse(member, request)) + .assertNext(message -> assertSame(response, message, "response")) + .thenCancel() + .verify(); + } + + @ParameterizedTest + @MethodSource("methodSource") + void requestResponseShouldFailByRoundRobin(int size, int startIndex, int ignore) + throws Exception { + final List
addresses = new ArrayList<>(); + final Member member = new Member("test", null, addresses, "namespace"); + for (int i = 0; i < size; i++) { + addresses.add(Address.from("test:" + i)); + } + + if (startIndex > 0) { + addressIndexByMember().put(member, startIndex); + } + + for (int i = 0; i < size; i++) { + final Address address = addresses.get(i); + when(transport.requestResponse(address, request)) + .thenReturn(Mono.error(new RuntimeException("Error"))); + } + + StepVerifier.create(transportWrapper.requestResponse(member, request)) + .verifyErrorSatisfies(throwable -> assertEquals("Error", throwable.getMessage())); + + assertEquals(startIndex, addressIndexByMember().get(member), "startIndex"); + } + + @ParameterizedTest + @MethodSource("methodSource") + void sendShouldWorkByRoundRobin(int size, int startIndex, int successIndex) throws Exception { + final List
addresses = new ArrayList<>(); + final Member member = new Member("test", null, addresses, "namespace"); + for (int i = 0; i < size; i++) { + addresses.add(Address.from("test:" + i)); + } + + if (startIndex > 0) { + addressIndexByMember().put(member, startIndex); + } + + for (int i = 0; i < size; i++) { + final Address address = addresses.get(i); + if (i == successIndex) { + when(transport.send(address, request)).thenReturn(Mono.empty()); + } else { + when(transport.send(address, request)) + .thenReturn(Mono.error(new RuntimeException("Error - " + i))); + } + } + + StepVerifier.create(transportWrapper.send(member, request)).verifyComplete(); + + assertEquals(successIndex, addressIndexByMember().get(member), "successIndex"); + } + + @ParameterizedTest + @MethodSource("methodSource") + void sendShouldFailByRoundRobin(int size, int startIndex, int ignore) throws Exception { + final List
addresses = new ArrayList<>(); + final Member member = new Member("test", null, addresses, "namespace"); + for (int i = 0; i < size; i++) { + addresses.add(Address.from("test:" + i)); + } + + if (startIndex > 0) { + addressIndexByMember().put(member, startIndex); + } + + for (int i = 0; i < size; i++) { + final Address address = addresses.get(i); + when(transport.send(address, request)).thenReturn(Mono.error(new RuntimeException("Error"))); + } + + StepVerifier.create(transportWrapper.send(member, request)) + .verifyErrorSatisfies(throwable -> assertEquals("Error", throwable.getMessage())); + + assertEquals(startIndex, addressIndexByMember().get(member), "startIndex"); + } + + @Test + void sendShouldWorkThenFail() { + final List
addresses = Collections.singletonList(Address.from("test:0")); + final Member member = new Member("test", null, addresses, "namespace"); + + when(transport.send(addresses.get(0), request)) + .thenReturn(Mono.empty()) + .thenReturn(Mono.error(new RuntimeException("Error"))); + + StepVerifier.create(transportWrapper.send(member, request)).verifyComplete(); + StepVerifier.create(transportWrapper.send(member, request)) + .verifyErrorSatisfies(throwable -> assertEquals("Error", throwable.getMessage())); + } + + @Test + void sendShouldFailThenWork() { + final List
addresses = Collections.singletonList(Address.from("test:0")); + final Member member = new Member("test", null, addresses, "namespace"); + + when(transport.send(addresses.get(0), request)) + .thenReturn(Mono.error(new RuntimeException("Error"))) + .thenReturn(Mono.empty()); + + StepVerifier.create(transportWrapper.send(member, request)) + .verifyErrorSatisfies(throwable -> assertEquals("Error", throwable.getMessage())); + StepVerifier.create(transportWrapper.send(member, request)).verifyComplete(); + } +} diff --git a/cluster/src/test/java/io/scalecube/cluster/fdetector/FailureDetectorTest.java b/cluster/src/test/java/io/scalecube/cluster/fdetector/FailureDetectorTest.java index 3c2241d8..0a8051f0 100644 --- a/cluster/src/test/java/io/scalecube/cluster/fdetector/FailureDetectorTest.java +++ b/cluster/src/test/java/io/scalecube/cluster/fdetector/FailureDetectorTest.java @@ -445,7 +445,8 @@ private static void assertStatus( events.stream() .filter(event -> event.status() == status) .map(FailureDetectorEvent::member) - .map(Member::address) + .map(Member::addresses) + .flatMap(Collection::stream) .collect(Collectors.toList()); String msg1 = @@ -473,7 +474,9 @@ private static Future> listenNextEventFor( List> resultFuture = new ArrayList<>(); for (final Address member : addresses) { final CompletableFuture future = new CompletableFuture<>(); - fd.listen().filter(event -> event.member().address() == member).subscribe(future::complete); + fd.listen() + .filter(event -> event.member().addresses().contains(member)) + .subscribe(future::complete); resultFuture.add(future); } diff --git a/cluster/src/test/java/io/scalecube/cluster/gossip/GossipDelayTest.java b/cluster/src/test/java/io/scalecube/cluster/gossip/GossipDelayTest.java index 3857d1e6..256ff0be 100644 --- a/cluster/src/test/java/io/scalecube/cluster/gossip/GossipDelayTest.java +++ b/cluster/src/test/java/io/scalecube/cluster/gossip/GossipDelayTest.java @@ -57,7 +57,10 @@ public void testMessageDelayMoreThanGossipSweepTime() throws InterruptedExceptio gossipProtocol3.listen().subscribe(message -> protocol3GossipCounter.incrementAndGet()); for (int i = 0; i < 3; i++) { - gossipProtocol1.spread(Message.fromData("message: " + i)).subscribe(); + final Member member = BaseTest.getField(gossipProtocol1, "localMember"); + gossipProtocol1 + .spread(Message.builder().sender(member).data("message: " + i).build()) + .subscribe(); } TimeUnit.MILLISECONDS.sleep( diff --git a/cluster/src/test/java/io/scalecube/cluster/gossip/GossipProtocolTest.java b/cluster/src/test/java/io/scalecube/cluster/gossip/GossipProtocolTest.java index ae82bca2..35889aaa 100644 --- a/cluster/src/test/java/io/scalecube/cluster/gossip/GossipProtocolTest.java +++ b/cluster/src/test/java/io/scalecube/cluster/gossip/GossipProtocolTest.java @@ -147,7 +147,9 @@ void testGossipProtocol(int membersNum, int lossPercent, int meanDelay) throws E // Spread gossip, measure and verify delivery metrics long start = System.currentTimeMillis(); - gossipProtocols.get(0).spread(Message.fromData(gossipData)).subscribe(); + final GossipProtocolImpl gossipProtocol = gossipProtocols.get(0); + final Member member = BaseTest.getField(gossipProtocol, "localMember"); + gossipProtocol.spread(Message.builder().sender(member).data(gossipData).build()).subscribe(); latch.await(2 * gossipTimeout, TimeUnit.MILLISECONDS); // Await for double gossip timeout disseminationTime = System.currentTimeMillis() - start; messageSentStatsDissemination = computeMessageSentStats(gossipProtocols); diff --git a/cluster/src/test/java/io/scalecube/cluster/gossip/GossipRequestTest.java b/cluster/src/test/java/io/scalecube/cluster/gossip/GossipRequestTest.java index e2dfa1fc..81531a09 100644 --- a/cluster/src/test/java/io/scalecube/cluster/gossip/GossipRequestTest.java +++ b/cluster/src/test/java/io/scalecube/cluster/gossip/GossipRequestTest.java @@ -27,6 +27,7 @@ public class GossipRequestTest extends BaseTest { private TestData testData; private MessageCodec messageCodec; + private Member sender; @BeforeEach public void init() { @@ -35,14 +36,18 @@ public void init() { testData = new TestData(); testData.setProperties(properties); messageCodec = MessageCodec.INSTANCE; + sender = new Member("0", null, Address.from("localhost:1234"), NAMESPACE); } @Test public void testSerializationAndDeserialization() throws Exception { - Member from = new Member("0", null, Address.from("localhost:1234"), NAMESPACE); List gossips = getGossips(); Message message = - Message.withData(new GossipRequest(gossips, from.id())).correlationId("CORR_ID").build(); + Message.builder() + .sender(sender) + .data(new GossipRequest(gossips, sender.id())) + .correlationId("CORR_ID") + .build(); ByteArrayOutputStream out = new ByteArrayOutputStream(); messageCodec.serialize(message, out); @@ -68,10 +73,10 @@ public void testSerializationAndDeserialization() throws Exception { } private List getGossips() { - Gossip request = - new Gossip("idGossip", Message.withData(testData).qualifier(testDataQualifier).build(), 0); - Gossip request2 = - new Gossip("idGossip2", Message.withData(testData).qualifier(testDataQualifier).build(), 1); + final Message message = + Message.builder().sender(sender).data(testData).qualifier(testDataQualifier).build(); + Gossip request = new Gossip("idGossip", message, 0); + Gossip request2 = new Gossip("idGossip2", message, 1); List gossips = new ArrayList<>(2); gossips.add(request); gossips.add(request2); diff --git a/cluster/src/test/java/io/scalecube/cluster/membership/MembershipProtocolTest.java b/cluster/src/test/java/io/scalecube/cluster/membership/MembershipProtocolTest.java index 4fced8e2..9a70e834 100644 --- a/cluster/src/test/java/io/scalecube/cluster/membership/MembershipProtocolTest.java +++ b/cluster/src/test/java/io/scalecube/cluster/membership/MembershipProtocolTest.java @@ -126,6 +126,7 @@ public void testLeaveClusterCameBeforeAlive() { final Message leavingMessage = Message.builder() .qualifier(MembershipProtocolImpl.MEMBERSHIP_GOSSIP) + .sender(anotherMember) .data(leavingRecord) .build(); @@ -137,6 +138,7 @@ public void testLeaveClusterCameBeforeAlive() { final Message addedMessage = Message.builder() .qualifier(MembershipProtocolImpl.MEMBERSHIP_GOSSIP) + .sender(anotherMember) .data(addedRecord) .build(); @@ -171,6 +173,7 @@ public void testLeaveClusterOnly() { final Message leavingMessage = Message.builder() .qualifier(MembershipProtocolImpl.MEMBERSHIP_GOSSIP) + .sender(anotherMember) .data(leavingRecord) .build(); @@ -205,6 +208,7 @@ public void testLeaveClusterOnSuspectedNode() { final Message suspectMessage = Message.builder() .qualifier(MembershipProtocolImpl.MEMBERSHIP_GOSSIP) + .sender(anotherMember) .data(suspectedNode) .build(); @@ -219,6 +223,7 @@ public void testLeaveClusterOnSuspectedNode() { final Message leavingMessage = Message.builder() .qualifier(MembershipProtocolImpl.MEMBERSHIP_GOSSIP) + .sender(anotherMember) .data(leavingRecord) .build(); @@ -762,19 +767,19 @@ public void testOverrideMemberAddress() throws UnknownHostException { NetworkEmulatorTransport e = createTransport(); MembershipProtocolImpl cmA = - createMembership(a, testConfig(Collections.emptyList()).externalHost(localAddress)); + createMembership(a, testConfig(Collections.emptyList()).externalHosts(localAddress)); MembershipProtocolImpl cmB = createMembership( - b, testConfig(Collections.singletonList(a.address())).externalHost(localAddress)); + b, testConfig(Collections.singletonList(a.address())).externalHosts(localAddress)); MembershipProtocolImpl cmC = createMembership( - c, testConfig(Collections.singletonList(a.address())).externalHost(localAddress)); + c, testConfig(Collections.singletonList(a.address())).externalHosts(localAddress)); MembershipProtocolImpl cmD = createMembership( - d, testConfig(Collections.singletonList(b.address())).externalHost(localAddress)); + d, testConfig(Collections.singletonList(b.address())).externalHosts(localAddress)); MembershipProtocolImpl cmE = createMembership( - e, testConfig(Collections.singletonList(b.address())).externalHost(localAddress)); + e, testConfig(Collections.singletonList(b.address())).externalHosts(localAddress)); try { awaitSeconds(3); diff --git a/codec-parent/codec-jackson-smile/src/test/java/io/scalecube/cluster/codec/jackson/smile/JacksonSmileMessageCodecTest.java b/codec-parent/codec-jackson-smile/src/test/java/io/scalecube/cluster/codec/jackson/smile/JacksonSmileMessageCodecTest.java index 36e1dcd8..a1bd5146 100644 --- a/codec-parent/codec-jackson-smile/src/test/java/io/scalecube/cluster/codec/jackson/smile/JacksonSmileMessageCodecTest.java +++ b/codec-parent/codec-jackson-smile/src/test/java/io/scalecube/cluster/codec/jackson/smile/JacksonSmileMessageCodecTest.java @@ -3,8 +3,10 @@ import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; +import io.scalecube.cluster.Member; import io.scalecube.cluster.transport.api.Message; import io.scalecube.cluster.transport.api.MessageCodec; +import io.scalecube.net.Address; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.nio.ByteBuffer; @@ -17,12 +19,13 @@ class JacksonSmileMessageCodecTest { private static final MessageCodec messageCodec = MessageCodec.INSTANCE; private static final Random random = new Random(); + private static final Member member = new Member("0", null, Address.NULL_ADDRESS, "NAMESPACE"); @Test void serializeAndDeserializeByteBuffer() throws Exception { byte[] bytes = "hello".getBytes(); - Message to = Message.builder().data(new Entity(ByteBuffer.wrap(bytes))).build(); + Message to = Message.builder().sender(member).data(new Entity(ByteBuffer.wrap(bytes))).build(); ByteArrayOutputStream output = new ByteArrayOutputStream(); messageCodec.serialize(to, output); @@ -42,7 +45,7 @@ void serializeAndDeserializeDirectByteBuffer() throws Exception { byteBuffer.put(bytes); byteBuffer.flip(); - Message to = Message.builder().data(new Entity(byteBuffer)).build(); + Message to = Message.builder().sender(member).data(new Entity(byteBuffer)).build(); ByteArrayOutputStream output = new ByteArrayOutputStream(); messageCodec.serialize(to, output); @@ -57,7 +60,7 @@ void serializeAndDeserializeDirectByteBuffer() throws Exception { void serializeAndDeserializeEmptyByteBuffer() throws Exception { byte[] bytes = new byte[0]; - Message to = Message.builder().data(new Entity(ByteBuffer.wrap(bytes))).build(); + Message to = Message.builder().sender(member).data(new Entity(ByteBuffer.wrap(bytes))).build(); ByteArrayOutputStream output = new ByteArrayOutputStream(); messageCodec.serialize(to, output); @@ -82,7 +85,7 @@ void serializeAndDeserializeByteBufferWithOffset() throws Exception { assertEquals(offset, byteBuffer.position()); assertEquals(bytes.length - offset, byteBuffer.remaining()); - Message to = Message.builder().data(new Entity(byteBuffer)).build(); + Message to = Message.builder().sender(member).data(new Entity(byteBuffer)).build(); ByteArrayOutputStream output = new ByteArrayOutputStream(); messageCodec.serialize(to, output); @@ -106,7 +109,7 @@ void serializeAndDeserializeByteBufferWithOffsetSlice() throws Exception { assertEquals(0, byteBuffer.position()); assertEquals(bytes.length - offset, byteBuffer.remaining()); - Message to = Message.builder().data(new Entity(byteBuffer)).build(); + Message to = Message.builder().sender(member).data(new Entity(byteBuffer)).build(); ByteArrayOutputStream output = new ByteArrayOutputStream(); messageCodec.serialize(to, output); @@ -134,7 +137,7 @@ void serializeAndDeserializeDirectByteBufferWithOffset() throws Exception { assertEquals(offset, byteBuffer.position()); assertEquals(bytes.length - offset, byteBuffer.remaining()); - Message to = Message.builder().data(new Entity(byteBuffer)).build(); + Message to = Message.builder().sender(member).data(new Entity(byteBuffer)).build(); ByteArrayOutputStream output = new ByteArrayOutputStream(); messageCodec.serialize(to, output); @@ -162,7 +165,7 @@ void serializeAndDeserializeDirectByteBufferWithOffsetSlice() throws Exception { assertEquals(0, slice.position()); assertEquals(bytes.length - offset, slice.remaining()); - Message to = Message.builder().data(new Entity(slice)).build(); + Message to = Message.builder().sender(member).data(new Entity(slice)).build(); ByteArrayOutputStream output = new ByteArrayOutputStream(); messageCodec.serialize(to, output); @@ -178,7 +181,7 @@ void serializeAndDeserializeDirectByteBufferWithOffsetSlice() throws Exception { void serializeAndDeserializeByteBufferWithoutEntity() throws Exception { byte[] bytes = "hello".getBytes(); - Message to = Message.builder().data(ByteBuffer.wrap(bytes)).build(); + Message to = Message.builder().sender(member).data(ByteBuffer.wrap(bytes)).build(); ByteArrayOutputStream output = new ByteArrayOutputStream(); messageCodec.serialize(to, output); diff --git a/codec-parent/codec-jackson/src/test/java/io/scalecube/cluster/codec/jackson/JacksonMessageCodecTest.java b/codec-parent/codec-jackson/src/test/java/io/scalecube/cluster/codec/jackson/JacksonMessageCodecTest.java index 191981d5..42898233 100644 --- a/codec-parent/codec-jackson/src/test/java/io/scalecube/cluster/codec/jackson/JacksonMessageCodecTest.java +++ b/codec-parent/codec-jackson/src/test/java/io/scalecube/cluster/codec/jackson/JacksonMessageCodecTest.java @@ -3,8 +3,10 @@ import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; +import io.scalecube.cluster.Member; import io.scalecube.cluster.transport.api.Message; import io.scalecube.cluster.transport.api.MessageCodec; +import io.scalecube.net.Address; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.nio.ByteBuffer; @@ -17,12 +19,13 @@ class JacksonMessageCodecTest { private static final MessageCodec messageCodec = MessageCodec.INSTANCE; private static final Random random = new Random(); + private static final Member member = new Member("0", null, Address.NULL_ADDRESS, "NAMESPACE"); @Test void serializeAndDeserializeByteBuffer() throws Exception { byte[] bytes = "hello".getBytes(); - Message to = Message.builder().data(new Entity(ByteBuffer.wrap(bytes))).build(); + Message to = Message.builder().sender(member).data(new Entity(ByteBuffer.wrap(bytes))).build(); ByteArrayOutputStream output = new ByteArrayOutputStream(); messageCodec.serialize(to, output); @@ -42,7 +45,7 @@ void serializeAndDeserializeDirectByteBuffer() throws Exception { byteBuffer.put(bytes); byteBuffer.flip(); - Message to = Message.builder().data(new Entity(byteBuffer)).build(); + Message to = Message.builder().sender(member).data(new Entity(byteBuffer)).build(); ByteArrayOutputStream output = new ByteArrayOutputStream(); messageCodec.serialize(to, output); @@ -57,7 +60,7 @@ void serializeAndDeserializeDirectByteBuffer() throws Exception { void serializeAndDeserializeEmptyByteBuffer() throws Exception { byte[] bytes = new byte[0]; - Message to = Message.builder().data(new Entity(ByteBuffer.wrap(bytes))).build(); + Message to = Message.builder().sender(member).data(new Entity(ByteBuffer.wrap(bytes))).build(); ByteArrayOutputStream output = new ByteArrayOutputStream(); messageCodec.serialize(to, output); @@ -82,7 +85,7 @@ void serializeAndDeserializeByteBufferWithOffset() throws Exception { assertEquals(offset, byteBuffer.position()); assertEquals(bytes.length - offset, byteBuffer.remaining()); - Message to = Message.builder().data(new Entity(byteBuffer)).build(); + Message to = Message.builder().sender(member).data(new Entity(byteBuffer)).build(); ByteArrayOutputStream output = new ByteArrayOutputStream(); messageCodec.serialize(to, output); @@ -106,7 +109,7 @@ void serializeAndDeserializeByteBufferWithOffsetSlice() throws Exception { assertEquals(0, byteBuffer.position()); assertEquals(bytes.length - offset, byteBuffer.remaining()); - Message to = Message.builder().data(new Entity(byteBuffer)).build(); + Message to = Message.builder().sender(member).data(new Entity(byteBuffer)).build(); ByteArrayOutputStream output = new ByteArrayOutputStream(); messageCodec.serialize(to, output); @@ -134,7 +137,7 @@ void serializeAndDeserializeDirectByteBufferWithOffset() throws Exception { assertEquals(offset, byteBuffer.position()); assertEquals(bytes.length - offset, byteBuffer.remaining()); - Message to = Message.builder().data(new Entity(byteBuffer)).build(); + Message to = Message.builder().sender(member).data(new Entity(byteBuffer)).build(); ByteArrayOutputStream output = new ByteArrayOutputStream(); messageCodec.serialize(to, output); @@ -162,7 +165,7 @@ void serializeAndDeserializeDirectByteBufferWithOffsetSlice() throws Exception { assertEquals(0, slice.position()); assertEquals(bytes.length - offset, slice.remaining()); - Message to = Message.builder().data(new Entity(slice)).build(); + Message to = Message.builder().sender(member).data(new Entity(slice)).build(); ByteArrayOutputStream output = new ByteArrayOutputStream(); messageCodec.serialize(to, output); @@ -178,7 +181,7 @@ void serializeAndDeserializeDirectByteBufferWithOffsetSlice() throws Exception { void serializeAndDeserializeByteBufferWithoutEntity() throws Exception { byte[] bytes = "hello".getBytes(); - Message to = Message.builder().data(ByteBuffer.wrap(bytes)).build(); + Message to = Message.builder().sender(member).data(ByteBuffer.wrap(bytes)).build(); ByteArrayOutputStream output = new ByteArrayOutputStream(); messageCodec.serialize(to, output); diff --git a/examples/pom.xml b/examples/pom.xml index a90c00a3..1456aee9 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -21,11 +21,6 @@ scalecube-transport-netty ${project.version} - - io.scalecube - scalecube-cluster-testlib - ${project.version} - io.scalecube scalecube-codec-jackson-smile diff --git a/examples/src/main/java/io/scalecube/examples/ClusterJoinExamples.java b/examples/src/main/java/io/scalecube/examples/ClusterJoinExamples.java index cff607f2..b2312f15 100644 --- a/examples/src/main/java/io/scalecube/examples/ClusterJoinExamples.java +++ b/examples/src/main/java/io/scalecube/examples/ClusterJoinExamples.java @@ -30,7 +30,7 @@ public static void main(String[] args) { Cluster bob = new ClusterImpl() .config(opts -> opts.memberAlias("Bob")) - .membership(opts -> opts.seedMembers(alice.address())) + .membership(opts -> opts.seedMembers(alice.addresses())) .transportFactory(TcpTransportFactory::new) .startAwait(); @@ -39,7 +39,7 @@ public static void main(String[] args) { Cluster carol = new ClusterImpl() .config(opts -> opts.memberAlias("Carol").metadata(metadata)) - .membership(opts -> opts.seedMembers(alice.address())) + .membership(opts -> opts.seedMembers(alice.addresses())) .transportFactory(TcpTransportFactory::new) .startAwait(); @@ -47,7 +47,7 @@ public static void main(String[] args) { ClusterConfig configWithFixedPort = new ClusterConfig() .memberAlias("Dan") - .membership(opts -> opts.seedMembers(alice.address())) + .membership(opts -> opts.seedMembers(alice.addresses())) .transport(opts -> opts.port(3000)); Cluster dan = new ClusterImpl(configWithFixedPort) @@ -61,10 +61,10 @@ public static void main(String[] args) { .membership( opts -> opts.seedMembers( - alice.address(), - bob.address(), - carol.address(), - dan.address()) // won't join anyway + alice.addresses().get(0), + bob.addresses().get(0), + carol.addresses().get(0), + dan.addresses().get(0)) // won't join anyway .namespace("another-cluster")); Cluster eve = new ClusterImpl(configWithSyncGroup) @@ -75,31 +75,31 @@ public static void main(String[] args) { System.out.println( "Alice (" - + alice.address() + + alice.addresses() + ") cluster: " + alice.members().stream().map(Member::toString).collect(joining("\n", "\n", "\n"))); System.out.println( "Bob (" - + bob.address() + + bob.addresses() + ") cluster: " + bob.members().stream().map(Member::toString).collect(joining("\n", "\n", "\n"))); System.out.println( "Carol (" - + carol.address() + + carol.addresses() + ") cluster: " + carol.members().stream().map(Member::toString).collect(joining("\n", "\n", "\n"))); System.out.println( "Dan (" - + dan.address() + + dan.addresses() + ") cluster: " + dan.members().stream().map(Member::toString).collect(joining("\n", "\n", "\n"))); System.out.println( "Eve (" - + eve.address() + + eve.addresses() + ") cluster: " // alone in cluster + eve.members().stream().map(Member::toString).collect(joining("\n", "\n", "\n"))); } diff --git a/examples/src/main/java/io/scalecube/examples/ClusterJoinNamespacesExamples.java b/examples/src/main/java/io/scalecube/examples/ClusterJoinNamespacesExamples.java index bb1910b1..611c5cfa 100644 --- a/examples/src/main/java/io/scalecube/examples/ClusterJoinNamespacesExamples.java +++ b/examples/src/main/java/io/scalecube/examples/ClusterJoinNamespacesExamples.java @@ -24,7 +24,7 @@ public static void main(String[] args) { new ClusterImpl() .config(opts -> opts.memberAlias("Bob")) .membership(opts -> opts.namespace("alice/bob-and-carol")) - .membership(opts -> opts.seedMembers(alice.address())) + .membership(opts -> opts.seedMembers(alice.addresses())) .transportFactory(TcpTransportFactory::new) .startAwait(); @@ -33,7 +33,7 @@ public static void main(String[] args) { new ClusterImpl() .config(opts -> opts.memberAlias("Carol")) .membership(opts -> opts.namespace("alice/bob-and-carol")) - .membership(opts -> opts.seedMembers(alice.address())) + .membership(opts -> opts.seedMembers(alice.addresses())) .transportFactory(TcpTransportFactory::new) .startAwait(); @@ -41,7 +41,7 @@ public static void main(String[] args) { new ClusterImpl() .config(opts -> opts.memberAlias("Bob-and-Carol-Child-1")) .membership(opts -> opts.namespace("alice/bob-and-carol/child-1")) - .membership(opts -> opts.seedMembers(alice.address())) + .membership(opts -> opts.seedMembers(alice.addresses())) .transportFactory(TcpTransportFactory::new) .startAwait(); @@ -49,7 +49,7 @@ public static void main(String[] args) { new ClusterImpl() .config(opts -> opts.memberAlias("Bob-and-Carol-Child-2")) .membership(opts -> opts.namespace("alice/bob-and-carol/child-2")) - .membership(opts -> opts.seedMembers(alice.address())) + .membership(opts -> opts.seedMembers(alice.addresses())) .transportFactory(TcpTransportFactory::new) .startAwait(); @@ -58,7 +58,7 @@ public static void main(String[] args) { new ClusterImpl() .config(opts -> opts.memberAlias("Dan")) .membership(opts -> opts.namespace("alice/dan-and-eve")) - .membership(opts -> opts.seedMembers(alice.address())) + .membership(opts -> opts.seedMembers(alice.addresses())) .transportFactory(TcpTransportFactory::new) .startAwait(); @@ -67,7 +67,7 @@ public static void main(String[] args) { new ClusterImpl() .config(opts -> opts.memberAlias("Eve")) .membership(opts -> opts.namespace("alice/dan-and-eve")) - .membership(opts -> opts.seedMembers(alice.address())) + .membership(opts -> opts.seedMembers(alice.addresses())) .transportFactory(TcpTransportFactory::new) .startAwait(); @@ -75,37 +75,37 @@ public static void main(String[] args) { System.out.println( "Alice (" - + alice.address() + + alice.addresses() + ") cluster: " + alice.members().stream().map(Member::toString).collect(joining("\n", "\n", "\n"))); System.out.println( "Bob (" - + bob.address() + + bob.addresses() + ") cluster: " + bob.members().stream().map(Member::toString).collect(joining("\n", "\n", "\n"))); System.out.println( "Carol (" - + carol.address() + + carol.addresses() + ") cluster: " + carol.members().stream().map(Member::toString).collect(joining("\n", "\n", "\n"))); System.out.println( "Dan (" - + dan.address() + + dan.addresses() + ") cluster: " + dan.members().stream().map(Member::toString).collect(joining("\n", "\n", "\n"))); System.out.println( "Eve (" - + eve.address() + + eve.addresses() + ") cluster: " // alone in cluster + eve.members().stream().map(Member::toString).collect(joining("\n", "\n", "\n"))); System.out.println( "Bob-And-Carol-Child-1 (" - + bobAndCarolChild1.address() + + bobAndCarolChild1.addresses() + ") cluster: " // alone in cluster + bobAndCarolChild1.members().stream() .map(Member::toString) @@ -113,7 +113,7 @@ public static void main(String[] args) { System.out.println( "Bob-And-Carol-Child-2 (" - + carolChild2.address() + + carolChild2.addresses() + ") cluster: " // alone in cluster + carolChild2.members().stream() .map(Member::toString) diff --git a/examples/src/main/java/io/scalecube/examples/ClusterMetadataExample.java b/examples/src/main/java/io/scalecube/examples/ClusterMetadataExample.java index 40fd88e9..feac6d80 100644 --- a/examples/src/main/java/io/scalecube/examples/ClusterMetadataExample.java +++ b/examples/src/main/java/io/scalecube/examples/ClusterMetadataExample.java @@ -29,7 +29,7 @@ public static void main(String[] args) throws Exception { Cluster joe = new ClusterImpl() .config(opts -> opts.metadata(Collections.singletonMap("name", "Joe"))) - .membership(opts -> opts.seedMembers(alice.address())) + .membership(opts -> opts.seedMembers(alice.addresses())) .transportFactory(TcpTransportFactory::new) .handler( cluster -> { diff --git a/examples/src/main/java/io/scalecube/examples/CustomMetadataEncodingExample.java b/examples/src/main/java/io/scalecube/examples/CustomMetadataEncodingExample.java index 90604932..6dd1f2fe 100644 --- a/examples/src/main/java/io/scalecube/examples/CustomMetadataEncodingExample.java +++ b/examples/src/main/java/io/scalecube/examples/CustomMetadataEncodingExample.java @@ -24,7 +24,7 @@ public static void main(String[] args) throws Exception { new ClusterImpl() .transportFactory(WebsocketTransportFactory::new) .config(opts -> opts.metadataCodec(new LongMetadataCodec()).metadata(123L)) - .membership(opts -> opts.seedMembers(alice.address())) + .membership(opts -> opts.seedMembers(alice.addresses())) .startAwait(); System.out.println( "[" + joe.member().id() + "] Joe's metadata: " + joe.metadata().orElse(null)); @@ -33,7 +33,7 @@ public static void main(String[] args) throws Exception { new ClusterImpl() .transportFactory(WebsocketTransportFactory::new) .config(opts -> opts.metadataCodec(new LongMetadataCodec()).metadata(456L)) - .membership(opts -> opts.seedMembers(alice.address())) + .membership(opts -> opts.seedMembers(alice.addresses())) .startAwait(); System.out.println( "[" + bob.member().id() + "] Bob's metadata: " + bob.metadata().orElse(null)); diff --git a/examples/src/main/java/io/scalecube/examples/GossipExample.java b/examples/src/main/java/io/scalecube/examples/GossipExample.java index 2135880c..e0012d03 100644 --- a/examples/src/main/java/io/scalecube/examples/GossipExample.java +++ b/examples/src/main/java/io/scalecube/examples/GossipExample.java @@ -3,6 +3,7 @@ import io.scalecube.cluster.Cluster; import io.scalecube.cluster.ClusterImpl; import io.scalecube.cluster.ClusterMessageHandler; +import io.scalecube.cluster.Member; import io.scalecube.cluster.transport.api.Message; import io.scalecube.transport.netty.tcp.TcpTransportFactory; @@ -34,7 +35,7 @@ public void onGossip(Message gossip) { //noinspection unused Cluster bob = new ClusterImpl() - .membership(opts -> opts.seedMembers(alice.address())) + .membership(opts -> opts.seedMembers(alice.addresses())) .transportFactory(TcpTransportFactory::new) .handler( cluster -> { @@ -50,7 +51,7 @@ public void onGossip(Message gossip) { //noinspection unused Cluster carol = new ClusterImpl() - .membership(opts -> opts.seedMembers(alice.address())) + .membership(opts -> opts.seedMembers(alice.addresses())) .transportFactory(TcpTransportFactory::new) .handler( cluster -> { @@ -66,7 +67,7 @@ public void onGossip(Message gossip) { //noinspection unused Cluster dan = new ClusterImpl() - .membership(opts -> opts.seedMembers(alice.address())) + .membership(opts -> opts.seedMembers(alice.addresses())) .transportFactory(TcpTransportFactory::new) .handler( cluster -> { @@ -82,10 +83,14 @@ public void onGossip(Message gossip) { // Start cluster node Eve that joins cluster and spreads gossip Cluster eve = new ClusterImpl() - .membership(opts -> opts.seedMembers(alice.address())) + .membership(opts -> opts.seedMembers(alice.addresses())) .transportFactory(TcpTransportFactory::new) .startAwait(); - eve.spreadGossip(Message.fromData("Gossip from Eve")) + + final Member member = eve.member(); + final Message message = Message.builder().sender(member).data("Gossip from Eve").build(); + + eve.spreadGossip(message) .doOnError(System.err::println) .subscribe(null, Throwable::printStackTrace); diff --git a/examples/src/main/java/io/scalecube/examples/MembershipEventsExample.java b/examples/src/main/java/io/scalecube/examples/MembershipEventsExample.java index 2040588f..b68bb3b0 100644 --- a/examples/src/main/java/io/scalecube/examples/MembershipEventsExample.java +++ b/examples/src/main/java/io/scalecube/examples/MembershipEventsExample.java @@ -47,7 +47,7 @@ public void onMembershipEvent(MembershipEvent event) { new ClusterImpl() .config(opts -> opts.memberAlias("Bob")) .config(opts -> opts.metadata(Collections.singletonMap("name", "Bob"))) - .membership(opts -> opts.seedMembers(alice.address())) + .membership(opts -> opts.seedMembers(alice.addresses())) .transportFactory(TcpTransportFactory::new) .handler( cluster -> { @@ -66,7 +66,7 @@ public void onMembershipEvent(MembershipEvent event) { new ClusterImpl() .config(opts -> opts.memberAlias("Carol")) .config(opts -> opts.metadata(Collections.singletonMap("name", "Carol"))) - .membership(opts -> opts.seedMembers(bob.address())) + .membership(opts -> opts.seedMembers(bob.addresses())) .transportFactory(TcpTransportFactory::new) .handler( cluster -> { diff --git a/pom.xml b/pom.xml index e21c8b82..a105d8d3 100644 --- a/pom.xml +++ b/pom.xml @@ -1,5 +1,7 @@ - + 4.0.0 @@ -40,12 +42,13 @@ 2020.0.32 2.15.1 - 5.3.1 - 5.9.3 + 4.6.1 + 5.8.2 1.3 https://maven.pkg.github.com/scalecube/scalecube-cluster + true @@ -55,6 +58,7 @@ cluster-testlib transport-parent codec-parent + cluster-tests diff --git a/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/Message.java b/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/Message.java index b5e3e879..b1bd2bc4 100644 --- a/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/Message.java +++ b/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/Message.java @@ -1,6 +1,5 @@ package io.scalecube.cluster.transport.api; -import io.scalecube.net.Address; import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; @@ -10,7 +9,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Objects; -import java.util.Optional; import java.util.StringJoiner; /** @@ -32,42 +30,18 @@ public final class Message implements Externalizable { */ public static final String HEADER_CORRELATION_ID = "cid"; - /** - * This header represents sender address of type {@link Address}. It's an address of message - * originator. This header is optional. - */ - public static final String HEADER_SENDER = "sender"; - private Map headers = Collections.emptyMap(); - private Object data; + private Object sender; // Member + private Object data; // GossipReq|SyncData|SyncAckData|PingReqData|... public Message() {} private Message(Builder builder) { + this.sender = Objects.requireNonNull(builder.sender, "sender"); this.data = builder.data; this.headers = Collections.unmodifiableMap(Objects.requireNonNull(builder.headers)); } - /** - * Instantiates a new message with the given data and without headers. - * - * @param data the data to build a message from - * @return the built message - */ - public static Message fromData(Object data) { - return withData(data).build(); - } - - /** - * Instantiates a new message builder with the given data and without headers. - * - * @param data the initial data for the builder - * @return a builder with initial data - */ - public static Builder withData(Object data) { - return builder().data(data); - } - /** * Instantiates a new message with the given headers and with empty data. * @@ -125,7 +99,7 @@ public static Message from(Message message) { * @return a builder with initial data and headers from the message */ public static Builder with(Message message) { - return withData(message.data).headers(message.headers); + return builder().sender(message.sender).data(message.data).headers(message.headers); } /** @@ -174,6 +148,16 @@ public String correlationId() { return header(HEADER_CORRELATION_ID); } + /** + * Returns {@code Member} of the sender of this message. + * + * @return address + */ + public T sender() { + //noinspection unchecked + return (T) sender; + } + /** * Return the message data, which can be byte array, string or any type. * @@ -185,19 +169,11 @@ public T data() { return (T) data; } - /** - * Returns {@link Address} of the sender of this message. - * - * @return address - */ - public Address sender() { - return Optional.ofNullable(header(HEADER_SENDER)).map(Address::from).orElse(null); - } - @Override public String toString() { return new StringJoiner(", ", Message.class.getSimpleName() + "[", "]") .add("headers=" + headers) + .add("sender=" + sender) .add("data=" + data) .toString(); } @@ -210,6 +186,8 @@ public void writeExternal(ObjectOutput out) throws IOException { out.writeUTF(header.getKey()); out.writeObject(header.getValue()); // value is nullable } + // sender + out.writeObject(sender); // data out.writeObject(data); } @@ -225,6 +203,8 @@ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundExcept headers.put(name, value); } this.headers = Collections.unmodifiableMap(headers); + // sender + sender = in.readObject(); // data data = in.readObject(); } @@ -232,6 +212,7 @@ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundExcept public static class Builder { private final Map headers = new HashMap<>(); + private Object sender; private Object data; private Builder() {} @@ -245,6 +226,15 @@ public Builder data(Object data) { return this; } + public Object sender() { + return sender; + } + + public Builder sender(Object sender) { + this.sender = sender; + return this; + } + private Map headers() { return this.headers; } @@ -281,10 +271,6 @@ public Builder correlationId(String correlationId) { return header(HEADER_CORRELATION_ID, correlationId); } - public Builder sender(Address sender) { - return header(HEADER_SENDER, sender.toString()); - } - public Message build() { return new Message(this); } diff --git a/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/TransportConfig.java b/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/TransportConfig.java index 06129f83..847188db 100644 --- a/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/TransportConfig.java +++ b/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/TransportConfig.java @@ -23,6 +23,7 @@ public final class TransportConfig implements Cloneable { private int maxFrameLength = 2 * 1024 * 1024; // 2 MB private TransportFactory transportFactory; private Function addressMapper = Function.identity(); + private boolean exposeAddress = true; public TransportConfig() {} @@ -73,7 +74,7 @@ public TransportConfig port(int port) { return t; } - public boolean isClientSecured() { + public boolean clientSecured() { return clientSecured; } @@ -137,15 +138,19 @@ public TransportConfig maxFrameLength(int maxFrameLength) { return t; } + public TransportFactory transportFactory() { + return transportFactory; + } + /** - * Setter for {@code addressMapper}. + * Setter for {@code transportFactory}. * - * @param addressMapper address mapper + * @param transportFactory transport factory * @return new {@code TransportConfig} instance */ - public TransportConfig addressMapper(Function addressMapper) { + public TransportConfig transportFactory(TransportFactory transportFactory) { TransportConfig t = clone(); - t.addressMapper = addressMapper; + t.transportFactory = transportFactory; return t; } @@ -153,19 +158,32 @@ public Function addressMapper() { return addressMapper; } - public TransportFactory transportFactory() { - return transportFactory; + /** + * Setter for {@code addressMapper}. + * + * @param addressMapper address mapper + * @return new {@code TransportConfig} instance + */ + public TransportConfig addressMapper(Function addressMapper) { + TransportConfig t = clone(); + t.addressMapper = addressMapper; + return t; + } + + public boolean exposeAddress() { + return exposeAddress; } /** - * Setter for {@code transportFactory}. + * Setter for {@code exposeAddress}. When set to {@code true} - will make transport listening + * address be advertised to cluster. By default set to {@code true}. * - * @param transportFactory transport factory + * @param exposeAddress exposeAddress flag * @return new {@code TransportConfig} instance */ - public TransportConfig transportFactory(TransportFactory transportFactory) { + public TransportConfig exposeAddress(boolean exposeAddress) { TransportConfig t = clone(); - t.transportFactory = transportFactory; + t.exposeAddress = exposeAddress; return t; } @@ -188,6 +206,7 @@ public String toString() { .add("maxFrameLength=" + maxFrameLength) .add("transportFactory=" + transportFactory) .add("addressMapper=" + addressMapper) + .add("exposeAddress=" + exposeAddress) .toString(); } } diff --git a/transport-parent/transport-netty/pom.xml b/transport-parent/transport-netty/pom.xml index a362787b..fcc4941f 100644 --- a/transport-parent/transport-netty/pom.xml +++ b/transport-parent/transport-netty/pom.xml @@ -22,14 +22,6 @@ io.projectreactor.netty reactor-netty - - - - ${project.groupId} - scalecube-cluster-testlib - ${project.version} - test - diff --git a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/tcp/TcpSender.java b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/tcp/TcpSender.java index 708d1a95..a9883a49 100644 --- a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/tcp/TcpSender.java +++ b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/tcp/TcpSender.java @@ -52,6 +52,6 @@ private TcpClient newTcpClient(SenderContext context, Address address) { (connectionObserver, channel, remoteAddress) -> new TcpChannelInitializer(config.maxFrameLength()) .accept(connectionObserver, channel)); - return config.isClientSecured() ? tcpClient.secure() : tcpClient; + return config.clientSecured() ? tcpClient.secure() : tcpClient; } } diff --git a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/websocket/WebsocketSender.java b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/websocket/WebsocketSender.java index 704a7f3d..6a62c1ae 100644 --- a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/websocket/WebsocketSender.java +++ b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/websocket/WebsocketSender.java @@ -55,7 +55,7 @@ private HttpClient.WebsocketSender newWebsocketSender(SenderContext context, Add .option(ChannelOption.SO_REUSEADDR, true) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.connectTimeout()); - if (config.isClientSecured()) { + if (config.clientSecured()) { httpClient = httpClient.secure(); } diff --git a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/BaseTest.java b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/BaseTest.java deleted file mode 100644 index 3d07e84d..00000000 --- a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/BaseTest.java +++ /dev/null @@ -1,89 +0,0 @@ -package io.scalecube.transport.netty; - -import io.scalecube.cluster.transport.api.Message; -import io.scalecube.cluster.transport.api.Transport; -import io.scalecube.cluster.transport.api.TransportConfig; -import io.scalecube.cluster.utils.NetworkEmulatorTransport; -import io.scalecube.net.Address; -import io.scalecube.transport.netty.tcp.TcpTransportFactory; -import io.scalecube.transport.netty.websocket.WebsocketTransportFactory; -import java.time.Duration; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.TestInfo; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import reactor.core.publisher.Mono; - -/** Base test class. */ -public class BaseTest { - - protected static final Logger LOGGER = LoggerFactory.getLogger(BaseTest.class); - - @BeforeEach - public final void baseSetUp(TestInfo testInfo) { - LOGGER.info("***** Test started : " + testInfo.getDisplayName() + " *****"); - } - - @AfterEach - public final void baseTearDown(TestInfo testInfo) { - LOGGER.info("***** Test finished : " + testInfo.getDisplayName() + " *****"); - } - - /** - * Sending message from src to destination. - * - * @param transport src - * @param to destination - * @param msg request - */ - protected Mono send(Transport transport, Address to, Message msg) { - return transport - .send(to, msg) - .doOnError( - th -> - LOGGER.error( - "Failed to send {} to {} from transport: {}, cause: {}", - msg, - to, - transport, - th.toString())); - } - - /** - * Stopping transport. - * - * @param transport trnasport object - */ - protected void destroyTransport(Transport transport) { - if (transport != null && !transport.isStopped()) { - try { - transport.stop().block(Duration.ofSeconds(1)); - } catch (Exception ex) { - LOGGER.warn("Failed to await transport termination: " + ex); - } - } - } - - /** - * Factory method to create a transport. - * - * @return tramsprot - */ - protected NetworkEmulatorTransport createTcpTransport() { - return new NetworkEmulatorTransport( - Transport.bindAwait( - TransportConfig.defaultConfig().transportFactory(new TcpTransportFactory()))); - } - - /** - * Factory method to create a transport. - * - * @return tramsprot - */ - protected NetworkEmulatorTransport createWebsocketTransport() { - return new NetworkEmulatorTransport( - Transport.bindAwait( - TransportConfig.defaultConfig().transportFactory(new WebsocketTransportFactory()))); - } -} diff --git a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/tcp/TcpTransportSendOrderTest.java b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/tcp/TcpTransportSendOrderTest.java deleted file mode 100644 index 753622d7..00000000 --- a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/tcp/TcpTransportSendOrderTest.java +++ /dev/null @@ -1,252 +0,0 @@ -package io.scalecube.transport.netty.tcp; - -import static org.junit.jupiter.api.Assertions.assertEquals; - -import io.scalecube.cluster.transport.api.Message; -import io.scalecube.cluster.transport.api.Transport; -import io.scalecube.net.Address; -import io.scalecube.transport.netty.BaseTest; -import java.time.Duration; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.LongSummaryStatistics; -import java.util.Map; -import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.stream.LongStream; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.TestInfo; -import reactor.core.Disposable; -import reactor.core.Exceptions; - -public class TcpTransportSendOrderTest extends BaseTest { - - // Auto-destroyed on tear down - private Transport client; - private Transport server; - - /** Tear down. */ - @AfterEach - public final void tearDown() { - destroyTransport(client); - destroyTransport(server); - } - - @Test - public void testSendOrderSingleThreadWithoutPromises(TestInfo testInfo) throws Exception { - server = createTcpTransport(); - - int iterationNum = 11; // +1 warm up iteration - int sentPerIteration = 1000; - long[] iterationTimeSeries = new long[iterationNum - 1]; - for (int i = 0; i < iterationNum; i++) { - LOGGER.debug("####### {} : iteration = {}", testInfo.getDisplayName(), i); - - client = createTcpTransport(); - final List received = new ArrayList<>(); - final CountDownLatch latch = new CountDownLatch(sentPerIteration); - - final Disposable serverSubscriber = - server - .listen() - .subscribe( - message -> { - received.add(message); - latch.countDown(); - }); - - long startAt = System.currentTimeMillis(); - for (int j = 0; j < sentPerIteration; j++) { - Message message = Message.withQualifier("q" + j).build(); - client - .send(server.address(), message) - .subscribe(null, th -> LOGGER.error("Failed to send message", th)); - } - latch.await(20, TimeUnit.SECONDS); - long iterationTime = System.currentTimeMillis() - startAt; - if (i > 0) { // exclude warm up iteration - iterationTimeSeries[i - 1] = iterationTime; - } - assertSendOrder(sentPerIteration, received); - - LOGGER.debug("Iteration time: {} ms", iterationTime); - - serverSubscriber.dispose(); - destroyTransport(client); - } - - LongSummaryStatistics iterationTimeStats = - LongStream.of(iterationTimeSeries).summaryStatistics(); - LOGGER.debug("Iteration time stats (ms): {}", iterationTimeStats); - } - - @Test - public void testSendOrderSingleThread(TestInfo testInfo) throws Exception { - server = createTcpTransport(); - - int iterationNum = 11; // +1 warm up iteration - int sentPerIteration = 1000; - long[] iterationTimeSeries = new long[iterationNum - 1]; - List totalSentTimeSeries = new ArrayList<>(sentPerIteration * (iterationNum - 1)); - for (int i = 0; i < iterationNum; i++) { - LOGGER.debug("####### {} : iteration = {}", testInfo.getDisplayName(), i); - List iterSentTimeSeries = new ArrayList<>(sentPerIteration); - - client = createTcpTransport(); - final List received = new ArrayList<>(); - final CountDownLatch latch = new CountDownLatch(sentPerIteration); - - final Disposable serverSubscriber = - server - .listen() - .subscribe( - message -> { - received.add(message); - latch.countDown(); - }); - - long startAt = System.currentTimeMillis(); - for (int j = 0; j < sentPerIteration; j++) { - long sentAt = System.currentTimeMillis(); - Message message = Message.withQualifier("q" + j).build(); - client - .send(server.address(), message) - .subscribe( - avoid -> iterSentTimeSeries.add(System.currentTimeMillis() - sentAt), - th -> - LOGGER.error( - "Failed to send message in {} ms", - System.currentTimeMillis() - sentAt, - th)); - } - - latch.await(20, TimeUnit.SECONDS); - long iterationTime = System.currentTimeMillis() - startAt; - if (i > 0) { // exclude warm up iteration - iterationTimeSeries[i - 1] = iterationTime; - } - assertSendOrder(sentPerIteration, received); - - Thread.sleep(10); // await a bit for last msg confirmation - - LongSummaryStatistics iterSentTimeStats = - iterSentTimeSeries.stream().mapToLong(v -> v).summaryStatistics(); - if (i == 0) { // warm up iteration - LOGGER.debug("Warm up iteration time: {} ms", iterationTime); - LOGGER.debug("Sent time stats warm up iter (ms): {}", iterSentTimeStats); - } else { - totalSentTimeSeries.addAll(iterSentTimeSeries); - LongSummaryStatistics totalSentTimeStats = - totalSentTimeSeries.stream().mapToLong(v -> v).summaryStatistics(); - LOGGER.debug("Iteration time: {} ms", iterationTime); - LOGGER.debug("Sent time stats iter (ms): {}", iterSentTimeStats); - LOGGER.debug("Sent time stats total (ms): {}", totalSentTimeStats); - } - - serverSubscriber.dispose(); - destroyTransport(client); - } - - LongSummaryStatistics iterationTimeStats = - LongStream.of(iterationTimeSeries).summaryStatistics(); - LOGGER.debug("Iteration time stats (ms): {}", iterationTimeStats); - } - - @Test - public void testSendOrderMultiThread(TestInfo testInfo) throws Exception { - Transport server = createTcpTransport(); - - final int total = 1000; - for (int i = 0; i < 10; i++) { - LOGGER.debug("####### {} : iteration = {}", testInfo.getDisplayName(), i); - ExecutorService exec = - Executors.newFixedThreadPool( - 4, - r -> { - Thread thread = new Thread(r); - thread.setName("testSendOrderMultiThread"); - thread.setDaemon(true); - return thread; - }); - - Transport client = createTcpTransport(); - final List received = new ArrayList<>(); - final CountDownLatch latch = new CountDownLatch(4 * total); - server - .listen() - .subscribe( - message -> { - received.add(message); - latch.countDown(); - }); - - final Future f0 = exec.submit(sender(0, client, server.address(), total)); - final Future f1 = exec.submit(sender(1, client, server.address(), total)); - final Future f2 = exec.submit(sender(2, client, server.address(), total)); - final Future f3 = exec.submit(sender(3, client, server.address(), total)); - - latch.await(20, TimeUnit.SECONDS); - - f0.get(1, TimeUnit.SECONDS); - f1.get(1, TimeUnit.SECONDS); - f2.get(1, TimeUnit.SECONDS); - f3.get(1, TimeUnit.SECONDS); - - exec.shutdownNow(); - - assertSenderOrder(0, total, received); - assertSenderOrder(1, total, received); - assertSenderOrder(2, total, received); - assertSenderOrder(3, total, received); - - destroyTransport(client); - } - - destroyTransport(client); - destroyTransport(server); - } - - private void assertSendOrder(int total, List received) { - ArrayList messages = new ArrayList<>(received); - assertEquals(total, messages.size()); - for (int k = 0; k < total; k++) { - assertEquals("q" + k, messages.get(k).qualifier()); - } - } - - private Callable sender(int id, Transport client, Address address, int total) { - return () -> { - for (int j = 0; j < total; j++) { - String correlationId = id + "/" + j; - try { - Message message = Message.withQualifier("q").correlationId(correlationId).build(); - client.send(address, message).block(Duration.ofSeconds(3)); - } catch (Exception e) { - LOGGER.error("Failed to send message: j = {} id = {}", j, id, e); - throw Exceptions.propagate(e); - } - } - return null; - }; - } - - private void assertSenderOrder(int id, int total, List received) { - ArrayList messages = new ArrayList<>(received); - Map> group = new HashMap<>(); - for (Message message : messages) { - Integer key = Integer.valueOf(message.correlationId().split("/")[0]); - group.computeIfAbsent(key, ArrayList::new).add(message); - } - - assertEquals(total, group.get(id).size()); - for (int k = 0; k < total; k++) { - assertEquals(id + "/" + k, group.get(id).get(k).correlationId()); - } - } -} diff --git a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/tcp/TcpTransportTest.java b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/tcp/TcpTransportTest.java deleted file mode 100644 index f8bb8daa..00000000 --- a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/tcp/TcpTransportTest.java +++ /dev/null @@ -1,344 +0,0 @@ -package io.scalecube.transport.netty.tcp; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.fail; - -import io.scalecube.cluster.transport.api.Message; -import io.scalecube.cluster.utils.NetworkEmulatorTransport; -import io.scalecube.net.Address; -import io.scalecube.transport.netty.BaseTest; -import java.io.IOException; -import java.net.UnknownHostException; -import java.time.Duration; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.TestInfo; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Sinks; -import reactor.test.StepVerifier; - -public class TcpTransportTest extends BaseTest { - - public static final Duration TIMEOUT = Duration.ofSeconds(10); - - // Auto-destroyed on tear down - private NetworkEmulatorTransport client; - private NetworkEmulatorTransport server; - - /** Tear down. */ - @AfterEach - public final void tearDown() { - destroyTransport(client); - destroyTransport(server); - } - - @Test - public void testUnresolvedHostConnection() { - client = createTcpTransport(); - // create transport with wrong host - try { - Address address = Address.from("wronghost:49255"); - Message message = Message.withData("q").build(); - client.send(address, message).block(Duration.ofSeconds(20)); - fail("fail"); - } catch (Exception e) { - assertEquals( - UnknownHostException.class, e.getCause().getClass(), "Unexpected exception class"); - } - } - - @Test - public void testInteractWithNoConnection(TestInfo testInfo) { - Address serverAddress = Address.from("localhost:49255"); - for (int i = 0; i < 10; i++) { - LOGGER.debug("####### {} : iteration = {}", testInfo.getDisplayName(), i); - - client = createTcpTransport(); - - // create transport and don't wait just send message - try { - Message msg = Message.withData("q").build(); - client.send(serverAddress, msg).block(Duration.ofSeconds(3)); - fail("fail"); - } catch (Exception e) { - assertTrue(e.getCause() instanceof IOException, "Unexpected exception type: " + e); - } - - // send second message: no connection yet and it's clear that there's no connection - try { - Message msg = Message.withData("q").build(); - client.send(serverAddress, msg).block(Duration.ofSeconds(3)); - fail("fail"); - } catch (Exception e) { - assertTrue(e.getCause() instanceof IOException, "Unexpected exception type: " + e); - } - - destroyTransport(client); - } - } - - @Test - public void testPingPongClientTfListenAndServerTfListen() throws Exception { - client = createTcpTransport(); - server = createTcpTransport(); - - server - .listen() - .subscribe( - message -> { - Address address = message.sender(); - assertEquals(client.address(), address, "Expected clientAddress"); - send(server, address, Message.fromQualifier("hi client")).subscribe(); - }); - - CompletableFuture messageFuture = new CompletableFuture<>(); - client.listen().subscribe(messageFuture::complete); - - send(client, server.address(), Message.fromQualifier("hello server")).subscribe(); - - Message result = messageFuture.get(3, TimeUnit.SECONDS); - assertNotNull(result, "No response from serverAddress"); - assertEquals("hi client", result.qualifier()); - } - - @Test - public void testNetworkSettings() { - client = createTcpTransport(); - server = createTcpTransport(); - - int lostPercent = 50; - int mean = 0; - client.networkEmulator().outboundSettings(server.address(), lostPercent, mean); - - final List serverMessageList = new ArrayList<>(); - server.listen().subscribe(serverMessageList::add); - - int total = 1000; - Flux.range(0, total) - .flatMap(i -> client.send(server.address(), Message.withData("q" + i).build())) - .onErrorContinue((th, o) -> {}) - .blockLast(TIMEOUT); - - int expectedMax = - total / 100 * lostPercent + total / 100 * 5; // +5% for maximum possible lost messages - int size = serverMessageList.size(); - assertTrue(size < expectedMax, "expectedMax=" + expectedMax + ", actual size=" + size); - } - - @Test - public void testPingPongOnSingleChannel() throws Exception { - server = createTcpTransport(); - client = createTcpTransport(); - - server - .listen() - .buffer(2) - .subscribe( - messages -> { - for (Message message : messages) { - Message echo = Message.withData("echo/" + message.qualifier()).build(); - server - .send(message.sender(), echo) - .subscribe(null, th -> LOGGER.error("Failed to send message", th)); - } - }); - - final CompletableFuture> targetFuture = new CompletableFuture<>(); - client.listen().buffer(2).subscribe(targetFuture::complete); - - Message q1 = Message.withData("q1").build(); - Message q2 = Message.withData("q2").build(); - - client - .send(server.address(), q1) - .subscribe(null, th -> LOGGER.error("Failed to send message", th)); - client - .send(server.address(), q2) - .subscribe(null, th -> LOGGER.error("Failed to send message", th)); - - List target = targetFuture.get(1, TimeUnit.SECONDS); - assertNotNull(target); - assertEquals(2, target.size()); - } - - @Test - public void testShouldRequestResponseSuccess() { - client = createTcpTransport(); - server = createTcpTransport(); - - server - .listen() - .filter(req -> req.qualifier().equals("hello/server")) - .subscribe( - message -> - send( - server, - message.sender(), - Message.builder() - .correlationId(message.correlationId()) - .data("hello: " + message.data()) - .build()) - .subscribe()); - - String result = - client - .requestResponse( - server.address(), - Message.builder() - .qualifier("hello/server") - .correlationId("123xyz") - .data("server") - .build()) - .map(msg -> msg.data().toString()) - .block(Duration.ofSeconds(1)); - - assertEquals("hello: server", result); - } - - @Test - public void testPingPongOnSeparateChannel() throws Exception { - server = createTcpTransport(); - client = createTcpTransport(); - - server - .listen() - .buffer(2) - .subscribe( - messages -> { - for (Message message : messages) { - Message echo = Message.withData("echo/" + message.qualifier()).build(); - server - .send(message.sender(), echo) - .subscribe(null, th -> LOGGER.error("Failed to send message", th)); - } - }); - - final CompletableFuture> targetFuture = new CompletableFuture<>(); - client.listen().buffer(2).subscribe(targetFuture::complete); - - Message q1 = Message.withData("q1").build(); - Message q2 = Message.withData("q2").build(); - - client - .send(server.address(), q1) - .subscribe(null, th -> LOGGER.error("Failed to send message", th)); - client - .send(server.address(), q2) - .subscribe(null, th -> LOGGER.error("Failed to send message", th)); - - List target = targetFuture.get(1, TimeUnit.SECONDS); - assertNotNull(target); - assertEquals(2, target.size()); - } - - @Test - public void testCompleteObserver() throws Exception { - server = createTcpTransport(); - client = createTcpTransport(); - - final CompletableFuture completeLatch = new CompletableFuture<>(); - final CompletableFuture messageLatch = new CompletableFuture<>(); - - server - .listen() - .subscribe( - messageLatch::complete, - errorConsumer -> { - // no-op - }, - () -> completeLatch.complete(true)); - - client.send(server.address(), Message.withData("q").build()).block(Duration.ofSeconds(1)); - - assertNotNull(messageLatch.get(1, TimeUnit.SECONDS)); - - server.stop().block(TIMEOUT); - - assertTrue(completeLatch.get(1, TimeUnit.SECONDS)); - } - - @Test - public void testObserverThrowsException() throws Exception { - server = createTcpTransport(); - client = createTcpTransport(); - - server - .listen() - .subscribe( - message -> { - String qualifier = message.data(); - if (qualifier.startsWith("throw")) { - throw new RuntimeException("" + message); - } - if (qualifier.startsWith("q")) { - Message echo = Message.withData("echo/" + message.qualifier()).build(); - server - .send(message.sender(), echo) - .subscribe(null, th -> LOGGER.error("Failed to send message", th)); - } - }, - Throwable::printStackTrace); - - // send "throw" and raise exception on server subscriber - final CompletableFuture messageFuture0 = new CompletableFuture<>(); - client.listen().subscribe(messageFuture0::complete); - Message message = Message.withData("throw").build(); - client - .send(server.address(), message) - .subscribe(null, th -> LOGGER.error("Failed to send message", th)); - Message message0 = null; - try { - message0 = messageFuture0.get(1, TimeUnit.SECONDS); - } catch (TimeoutException e) { - // ignore since expected behavior - } - assertNull(message0); - - // send normal message and check whether server subscriber is broken (no response) - final CompletableFuture messageFuture1 = new CompletableFuture<>(); - client.listen().subscribe(messageFuture1::complete); - client.send(server.address(), Message.withData("q").build()); - Message transportMessage1 = null; - try { - transportMessage1 = messageFuture1.get(1, TimeUnit.SECONDS); - } catch (TimeoutException e) { - // ignore since expected behavior - } - assertNull(transportMessage1); - } - - @Test - public void testBlockAndUnblockTraffic() throws Exception { - client = createTcpTransport(); - server = createTcpTransport(); - - server.listen().subscribe(message -> server.send(message.sender(), message).subscribe()); - - Sinks.Many responses = Sinks.many().replay().all(); - client - .listen() - .subscribe(responses::tryEmitNext, responses::tryEmitError, responses::tryEmitComplete); - - // test at unblocked transport - send(client, server.address(), Message.fromQualifier("q/unblocked")).subscribe(); - - // then block client->server messages - Thread.sleep(1000); - client.networkEmulator().blockOutbound(server.address()); - send(client, server.address(), Message.fromQualifier("q/blocked")).subscribe(); - - StepVerifier.create(responses.asFlux()) - .assertNext(message -> assertEquals("q/unblocked", message.qualifier())) - .expectNoEvent(Duration.ofMillis(300)) - .thenCancel() - .verify(TIMEOUT); - } -} diff --git a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/websocket/WebsocketTransportSendOrderTest.java b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/websocket/WebsocketTransportSendOrderTest.java deleted file mode 100644 index ab17c5b1..00000000 --- a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/websocket/WebsocketTransportSendOrderTest.java +++ /dev/null @@ -1,252 +0,0 @@ -package io.scalecube.transport.netty.websocket; - -import static org.junit.jupiter.api.Assertions.assertEquals; - -import io.scalecube.cluster.transport.api.Message; -import io.scalecube.cluster.transport.api.Transport; -import io.scalecube.net.Address; -import io.scalecube.transport.netty.BaseTest; -import java.time.Duration; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.LongSummaryStatistics; -import java.util.Map; -import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.stream.LongStream; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.TestInfo; -import reactor.core.Disposable; -import reactor.core.Exceptions; - -public class WebsocketTransportSendOrderTest extends BaseTest { - - // Auto-destroyed on tear down - private Transport client; - private Transport server; - - /** Tear down. */ - @AfterEach - public final void tearDown() { - destroyTransport(client); - destroyTransport(server); - } - - @Test - public void testSendOrderSingleThreadWithoutPromises(TestInfo testInfo) throws Exception { - server = createWebsocketTransport(); - - int iterationNum = 11; // +1 warm up iteration - int sentPerIteration = 1000; - long[] iterationTimeSeries = new long[iterationNum - 1]; - for (int i = 0; i < iterationNum; i++) { - LOGGER.debug("####### {} : iteration = {}", testInfo.getDisplayName(), i); - - client = createWebsocketTransport(); - final List received = new ArrayList<>(); - final CountDownLatch latch = new CountDownLatch(sentPerIteration); - - final Disposable serverSubscriber = - server - .listen() - .subscribe( - message -> { - received.add(message); - latch.countDown(); - }); - - long startAt = System.currentTimeMillis(); - for (int j = 0; j < sentPerIteration; j++) { - Message message = Message.withQualifier("q" + j).build(); - client - .send(server.address(), message) - .subscribe(null, th -> LOGGER.error("Failed to send message", th)); - } - latch.await(20, TimeUnit.SECONDS); - long iterationTime = System.currentTimeMillis() - startAt; - if (i > 0) { // exclude warm up iteration - iterationTimeSeries[i - 1] = iterationTime; - } - assertSendOrder(sentPerIteration, received); - - LOGGER.debug("Iteration time: {} ms", iterationTime); - - serverSubscriber.dispose(); - destroyTransport(client); - } - - LongSummaryStatistics iterationTimeStats = - LongStream.of(iterationTimeSeries).summaryStatistics(); - LOGGER.debug("Iteration time stats (ms): {}", iterationTimeStats); - } - - @Test - public void testSendOrderSingleThread(TestInfo testInfo) throws Exception { - server = createWebsocketTransport(); - - int iterationNum = 11; // +1 warm up iteration - int sentPerIteration = 1000; - long[] iterationTimeSeries = new long[iterationNum - 1]; - List totalSentTimeSeries = new ArrayList<>(sentPerIteration * (iterationNum - 1)); - for (int i = 0; i < iterationNum; i++) { - LOGGER.debug("####### {} : iteration = {}", testInfo.getDisplayName(), i); - List iterSentTimeSeries = new ArrayList<>(sentPerIteration); - - client = createWebsocketTransport(); - final List received = new ArrayList<>(); - final CountDownLatch latch = new CountDownLatch(sentPerIteration); - - final Disposable serverSubscriber = - server - .listen() - .subscribe( - message -> { - received.add(message); - latch.countDown(); - }); - - long startAt = System.currentTimeMillis(); - for (int j = 0; j < sentPerIteration; j++) { - long sentAt = System.currentTimeMillis(); - Message message = Message.withQualifier("q" + j).build(); - client - .send(server.address(), message) - .subscribe( - avoid -> iterSentTimeSeries.add(System.currentTimeMillis() - sentAt), - th -> - LOGGER.error( - "Failed to send message in {} ms", - System.currentTimeMillis() - sentAt, - th)); - } - - latch.await(20, TimeUnit.SECONDS); - long iterationTime = System.currentTimeMillis() - startAt; - if (i > 0) { // exclude warm up iteration - iterationTimeSeries[i - 1] = iterationTime; - } - assertSendOrder(sentPerIteration, received); - - Thread.sleep(10); // await a bit for last msg confirmation - - LongSummaryStatistics iterSentTimeStats = - iterSentTimeSeries.stream().mapToLong(v -> v).summaryStatistics(); - if (i == 0) { // warm up iteration - LOGGER.debug("Warm up iteration time: {} ms", iterationTime); - LOGGER.debug("Sent time stats warm up iter (ms): {}", iterSentTimeStats); - } else { - totalSentTimeSeries.addAll(iterSentTimeSeries); - LongSummaryStatistics totalSentTimeStats = - totalSentTimeSeries.stream().mapToLong(v -> v).summaryStatistics(); - LOGGER.debug("Iteration time: {} ms", iterationTime); - LOGGER.debug("Sent time stats iter (ms): {}", iterSentTimeStats); - LOGGER.debug("Sent time stats total (ms): {}", totalSentTimeStats); - } - - serverSubscriber.dispose(); - destroyTransport(client); - } - - LongSummaryStatistics iterationTimeStats = - LongStream.of(iterationTimeSeries).summaryStatistics(); - LOGGER.debug("Iteration time stats (ms): {}", iterationTimeStats); - } - - @Test - public void testSendOrderMultiThread(TestInfo testInfo) throws Exception { - Transport server = createWebsocketTransport(); - - final int total = 1000; - for (int i = 0; i < 10; i++) { - LOGGER.debug("####### {} : iteration = {}", testInfo.getDisplayName(), i); - ExecutorService exec = - Executors.newFixedThreadPool( - 4, - r -> { - Thread thread = new Thread(r); - thread.setName("testSendOrderMultiThread"); - thread.setDaemon(true); - return thread; - }); - - Transport client = createWebsocketTransport(); - final List received = new ArrayList<>(); - final CountDownLatch latch = new CountDownLatch(4 * total); - server - .listen() - .subscribe( - message -> { - received.add(message); - latch.countDown(); - }); - - final Future f0 = exec.submit(sender(0, client, server.address(), total)); - final Future f1 = exec.submit(sender(1, client, server.address(), total)); - final Future f2 = exec.submit(sender(2, client, server.address(), total)); - final Future f3 = exec.submit(sender(3, client, server.address(), total)); - - latch.await(20, TimeUnit.SECONDS); - - f0.get(1, TimeUnit.SECONDS); - f1.get(1, TimeUnit.SECONDS); - f2.get(1, TimeUnit.SECONDS); - f3.get(1, TimeUnit.SECONDS); - - exec.shutdownNow(); - - assertSenderOrder(0, total, received); - assertSenderOrder(1, total, received); - assertSenderOrder(2, total, received); - assertSenderOrder(3, total, received); - - destroyTransport(client); - } - - destroyTransport(client); - destroyTransport(server); - } - - private void assertSendOrder(int total, List received) { - ArrayList messages = new ArrayList<>(received); - assertEquals(total, messages.size()); - for (int k = 0; k < total; k++) { - assertEquals("q" + k, messages.get(k).qualifier()); - } - } - - private Callable sender(int id, Transport client, Address address, int total) { - return () -> { - for (int j = 0; j < total; j++) { - String correlationId = id + "/" + j; - try { - Message message = Message.withQualifier("q").correlationId(correlationId).build(); - client.send(address, message).block(Duration.ofSeconds(3)); - } catch (Exception e) { - LOGGER.error("Failed to send message: j = {} id = {}", j, id, e); - throw Exceptions.propagate(e); - } - } - return null; - }; - } - - private void assertSenderOrder(int id, int total, List received) { - ArrayList messages = new ArrayList<>(received); - Map> group = new HashMap<>(); - for (Message message : messages) { - Integer key = Integer.valueOf(message.correlationId().split("/")[0]); - group.computeIfAbsent(key, ArrayList::new).add(message); - } - - assertEquals(total, group.get(id).size()); - for (int k = 0; k < total; k++) { - assertEquals(id + "/" + k, group.get(id).get(k).correlationId()); - } - } -} diff --git a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/websocket/WebsocketTransportTest.java b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/websocket/WebsocketTransportTest.java deleted file mode 100644 index 050474af..00000000 --- a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/websocket/WebsocketTransportTest.java +++ /dev/null @@ -1,344 +0,0 @@ -package io.scalecube.transport.netty.websocket; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.fail; - -import io.scalecube.cluster.transport.api.Message; -import io.scalecube.cluster.utils.NetworkEmulatorTransport; -import io.scalecube.net.Address; -import io.scalecube.transport.netty.BaseTest; -import java.io.IOException; -import java.net.UnknownHostException; -import java.time.Duration; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.TestInfo; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Sinks; -import reactor.test.StepVerifier; - -public class WebsocketTransportTest extends BaseTest { - - public static final Duration TIMEOUT = Duration.ofSeconds(10); - - // Auto-destroyed on tear down - private NetworkEmulatorTransport client; - private NetworkEmulatorTransport server; - - /** Tear down. */ - @AfterEach - public final void tearDown() { - destroyTransport(client); - destroyTransport(server); - } - - @Test - public void testUnresolvedHostConnection() { - client = createWebsocketTransport(); - // create transport with wrong host - try { - Address address = Address.from("wronghost:49255"); - Message message = Message.withData("q").build(); - client.send(address, message).block(Duration.ofSeconds(20)); - fail("fail"); - } catch (Exception e) { - assertEquals( - UnknownHostException.class, e.getCause().getClass(), "Unexpected exception class"); - } - } - - @Test - public void testInteractWithNoConnection(TestInfo testInfo) { - Address serverAddress = Address.from("localhost:49255"); - for (int i = 0; i < 10; i++) { - LOGGER.debug("####### {} : iteration = {}", testInfo.getDisplayName(), i); - - client = createWebsocketTransport(); - - // create transport and don't wait just send message - try { - Message msg = Message.withData("q").build(); - client.send(serverAddress, msg).block(Duration.ofSeconds(3)); - fail("fail"); - } catch (Exception e) { - assertTrue(e.getCause() instanceof IOException, "Unexpected exception type: " + e); - } - - // send second message: no connection yet and it's clear that there's no connection - try { - Message msg = Message.withData("q").build(); - client.send(serverAddress, msg).block(Duration.ofSeconds(3)); - fail("fail"); - } catch (Exception e) { - assertTrue(e.getCause() instanceof IOException, "Unexpected exception type: " + e); - } - - destroyTransport(client); - } - } - - @Test - public void testPingPongClientTfListenAndServerTfListen() throws Exception { - client = createWebsocketTransport(); - server = createWebsocketTransport(); - - server - .listen() - .subscribe( - message -> { - Address address = message.sender(); - assertEquals(client.address(), address, "Expected clientAddress"); - send(server, address, Message.fromQualifier("hi client")).subscribe(); - }); - - CompletableFuture messageFuture = new CompletableFuture<>(); - client.listen().subscribe(messageFuture::complete); - - send(client, server.address(), Message.fromQualifier("hello server")).subscribe(); - - Message result = messageFuture.get(3, TimeUnit.SECONDS); - assertNotNull(result, "No response from serverAddress"); - assertEquals("hi client", result.qualifier()); - } - - @Test - public void testNetworkSettings() { - client = createWebsocketTransport(); - server = createWebsocketTransport(); - - int lostPercent = 50; - int mean = 0; - client.networkEmulator().outboundSettings(server.address(), lostPercent, mean); - - final List serverMessageList = new ArrayList<>(); - server.listen().subscribe(serverMessageList::add); - - int total = 1000; - Flux.range(0, total) - .flatMap(i -> client.send(server.address(), Message.withData("q" + i).build())) - .onErrorContinue((th, o) -> {}) - .blockLast(TIMEOUT); - - int expectedMax = - total / 100 * lostPercent + total / 100 * 5; // +5% for maximum possible lost messages - int size = serverMessageList.size(); - assertTrue(size < expectedMax, "expectedMax=" + expectedMax + ", actual size=" + size); - } - - @Test - public void testPingPongOnSingleChannel() throws Exception { - server = createWebsocketTransport(); - client = createWebsocketTransport(); - - server - .listen() - .buffer(2) - .subscribe( - messages -> { - for (Message message : messages) { - Message echo = Message.withData("echo/" + message.qualifier()).build(); - server - .send(message.sender(), echo) - .subscribe(null, th -> LOGGER.error("Failed to send message", th)); - } - }); - - final CompletableFuture> targetFuture = new CompletableFuture<>(); - client.listen().buffer(2).subscribe(targetFuture::complete); - - Message q1 = Message.withData("q1").build(); - Message q2 = Message.withData("q2").build(); - - client - .send(server.address(), q1) - .subscribe(null, th -> LOGGER.error("Failed to send message", th)); - client - .send(server.address(), q2) - .subscribe(null, th -> LOGGER.error("Failed to send message", th)); - - List target = targetFuture.get(1, TimeUnit.SECONDS); - assertNotNull(target); - assertEquals(2, target.size()); - } - - @Test - public void testShouldRequestResponseSuccess() { - client = createWebsocketTransport(); - server = createWebsocketTransport(); - - server - .listen() - .filter(req -> req.qualifier().equals("hello/server")) - .subscribe( - message -> - send( - server, - message.sender(), - Message.builder() - .correlationId(message.correlationId()) - .data("hello: " + message.data()) - .build()) - .subscribe()); - - String result = - client - .requestResponse( - server.address(), - Message.builder() - .qualifier("hello/server") - .correlationId("123xyz") - .data("server") - .build()) - .map(msg -> msg.data().toString()) - .block(Duration.ofSeconds(1)); - - assertEquals("hello: server", result); - } - - @Test - public void testPingPongOnSeparateChannel() throws Exception { - server = createWebsocketTransport(); - client = createWebsocketTransport(); - - server - .listen() - .buffer(2) - .subscribe( - messages -> { - for (Message message : messages) { - Message echo = Message.withData("echo/" + message.qualifier()).build(); - server - .send(message.sender(), echo) - .subscribe(null, th -> LOGGER.error("Failed to send message", th)); - } - }); - - final CompletableFuture> targetFuture = new CompletableFuture<>(); - client.listen().buffer(2).subscribe(targetFuture::complete); - - Message q1 = Message.withData("q1").build(); - Message q2 = Message.withData("q2").build(); - - client - .send(server.address(), q1) - .subscribe(null, th -> LOGGER.error("Failed to send message", th)); - client - .send(server.address(), q2) - .subscribe(null, th -> LOGGER.error("Failed to send message", th)); - - List target = targetFuture.get(1, TimeUnit.SECONDS); - assertNotNull(target); - assertEquals(2, target.size()); - } - - @Test - public void testCompleteObserver() throws Exception { - server = createWebsocketTransport(); - client = createWebsocketTransport(); - - final CompletableFuture completeLatch = new CompletableFuture<>(); - final CompletableFuture messageLatch = new CompletableFuture<>(); - - server - .listen() - .subscribe( - messageLatch::complete, - errorConsumer -> { - // no-op - }, - () -> completeLatch.complete(true)); - - client.send(server.address(), Message.withData("q").build()).block(Duration.ofSeconds(1)); - - assertNotNull(messageLatch.get(1, TimeUnit.SECONDS)); - - server.stop().block(TIMEOUT); - - assertTrue(completeLatch.get(1, TimeUnit.SECONDS)); - } - - @Test - public void testObserverThrowsException() throws Exception { - server = createWebsocketTransport(); - client = createWebsocketTransport(); - - server - .listen() - .subscribe( - message -> { - String qualifier = message.data(); - if (qualifier.startsWith("throw")) { - throw new RuntimeException("" + message); - } - if (qualifier.startsWith("q")) { - Message echo = Message.withData("echo/" + message.qualifier()).build(); - server - .send(message.sender(), echo) - .subscribe(null, th -> LOGGER.error("Failed to send message", th)); - } - }, - Throwable::printStackTrace); - - // send "throw" and raise exception on server subscriber - final CompletableFuture messageFuture0 = new CompletableFuture<>(); - client.listen().subscribe(messageFuture0::complete); - Message message = Message.withData("throw").build(); - client - .send(server.address(), message) - .subscribe(null, th -> LOGGER.error("Failed to send message", th)); - Message message0 = null; - try { - message0 = messageFuture0.get(1, TimeUnit.SECONDS); - } catch (TimeoutException e) { - // ignore since expected behavior - } - assertNull(message0); - - // send normal message and check whether server subscriber is broken (no response) - final CompletableFuture messageFuture1 = new CompletableFuture<>(); - client.listen().subscribe(messageFuture1::complete); - client.send(server.address(), Message.withData("q").build()); - Message transportMessage1 = null; - try { - transportMessage1 = messageFuture1.get(1, TimeUnit.SECONDS); - } catch (TimeoutException e) { - // ignore since expected behavior - } - assertNull(transportMessage1); - } - - @Test - public void testBlockAndUnblockTraffic() throws Exception { - client = createWebsocketTransport(); - server = createWebsocketTransport(); - - server.listen().subscribe(message -> server.send(message.sender(), message).subscribe()); - - Sinks.Many responses = Sinks.many().replay().all(); - client - .listen() - .subscribe(responses::tryEmitNext, responses::tryEmitError, responses::tryEmitComplete); - - // test at unblocked transport - send(client, server.address(), Message.fromQualifier("q/unblocked")).subscribe(); - - // then block client->server messages - Thread.sleep(1000); - client.networkEmulator().blockOutbound(server.address()); - send(client, server.address(), Message.fromQualifier("q/blocked")).subscribe(); - - StepVerifier.create(responses.asFlux()) - .assertNext(message -> assertEquals("q/unblocked", message.qualifier())) - .expectNoEvent(Duration.ofMillis(300)) - .thenCancel() - .verify(TIMEOUT); - } -}