Skip to content

Commit

Permalink
Merge branch 'refs/heads/master' into transportApi2
Browse files Browse the repository at this point in the history
  • Loading branch information
artem-v committed Sep 27, 2024
2 parents 6d1b71c + 270366f commit 08ba859
Show file tree
Hide file tree
Showing 39 changed files with 518 additions and 332 deletions.
8 changes: 3 additions & 5 deletions cluster-api/pom.xml
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
Expand All @@ -12,10 +14,6 @@
<name>ScaleCube/ClusterApi</name>

<dependencies>
<dependency>
<groupId>io.scalecube</groupId>
<artifactId>scalecube-commons</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>scalecube-transport-api</artifactId>
Expand Down
9 changes: 4 additions & 5 deletions cluster-api/src/main/java/io/scalecube/cluster/Cluster.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.scalecube.cluster;

import io.scalecube.cluster.transport.api.Message;
import io.scalecube.net.Address;
import java.util.Collection;
import java.util.Optional;
import reactor.core.publisher.Mono;
Expand All @@ -10,11 +9,11 @@
public interface Cluster {

/**
* Returns {@link Address} of this cluster instance.
* Returns address of this cluster instance.
*
* @return cluster address
*/
Address address();
String address();

/**
* Spreads given message between cluster members using gossiping protocol.
Expand Down Expand Up @@ -52,15 +51,15 @@ public interface Cluster {
*
* @return member by id
*/
Optional<Member> member(String id);
Optional<Member> memberById(String id);

/**
* Returns cluster member by given address or null if no member with such address exists at joined
* cluster.
*
* @return member by address
*/
Optional<Member> member(Address address);
Optional<Member> memberByAddress(String address);

/**
* Returns list of all members of the joined cluster. This will include all cluster members
Expand Down
11 changes: 5 additions & 6 deletions cluster-api/src/main/java/io/scalecube/cluster/Member.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.scalecube.cluster;

import io.scalecube.cluster.membership.MembershipConfig;
import io.scalecube.net.Address;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
Expand All @@ -20,7 +19,7 @@ public final class Member implements Externalizable {

private String id;
private String alias;
private Address address;
private String address;
private String namespace;

public Member() {}
Expand All @@ -33,7 +32,7 @@ public Member() {}
* @param address member address; not null
* @param namespace namespace; not null
*/
public Member(String id, String alias, Address address, String namespace) {
public Member(String id, String alias, String address, String namespace) {
this.id = Objects.requireNonNull(id, "member id");
this.alias = alias; // optional
this.address = Objects.requireNonNull(address, "member address");
Expand Down Expand Up @@ -76,7 +75,7 @@ public String namespace() {
* @see io.scalecube.cluster.transport.api.TransportConfig#port(int)
* @return member address
*/
public Address address() {
public String address() {
return address;
}

Expand Down Expand Up @@ -110,7 +109,7 @@ public void writeExternal(ObjectOutput out) throws IOException {
out.writeUTF(alias);
}
// address
out.writeUTF(address.toString());
out.writeUTF(address);
// namespace
out.writeUTF(namespace);
}
Expand All @@ -125,7 +124,7 @@ public void readExternal(ObjectInput in) throws IOException {
alias = in.readUTF();
}
// address
address = Address.from(in.readUTF());
address = in.readUTF();
// namespace
this.namespace = in.readUTF();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package io.scalecube.cluster.membership;

import io.scalecube.net.Address;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -24,7 +23,7 @@ public final class MembershipConfig implements Cloneable {
public static final int DEFAULT_LOCAL_SUSPICION_MULT = 3;
public static final int DEFAULT_LOCAL_SYNC_INTERVAL = 15_000;

private List<Address> seedMembers = Collections.emptyList();
private List<String> seedMembers = Collections.emptyList();
private int syncInterval = DEFAULT_SYNC_INTERVAL;
private int syncTimeout = DEFAULT_SYNC_TIMEOUT;
private int suspicionMult = DEFAULT_SUSPICION_MULT;
Expand Down Expand Up @@ -67,7 +66,7 @@ public static MembershipConfig defaultLocalConfig() {
.syncInterval(DEFAULT_LOCAL_SYNC_INTERVAL);
}

public List<Address> seedMembers() {
public List<String> seedMembers() {
return seedMembers;
}

Expand All @@ -77,7 +76,7 @@ public List<Address> seedMembers() {
* @param seedMembers seed members
* @return new {@code MembershipConfig} instance
*/
public MembershipConfig seedMembers(Address... seedMembers) {
public MembershipConfig seedMembers(String... seedMembers) {
return seedMembers(Arrays.asList(seedMembers));
}

Expand All @@ -87,7 +86,7 @@ public MembershipConfig seedMembers(Address... seedMembers) {
* @param seedMembers seed members
* @return new {@code MembershipConfig} instance
*/
public MembershipConfig seedMembers(List<Address> seedMembers) {
public MembershipConfig seedMembers(List<String> seedMembers) {
MembershipConfig m = clone();
m.seedMembers = Collections.unmodifiableList(new ArrayList<>(seedMembers));
return m;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package io.scalecube.cluster.metadata;

import io.scalecube.utils.ServiceLoaderUtil;
import java.nio.ByteBuffer;
import java.util.ServiceLoader;
import java.util.stream.StreamSupport;

/** Contains methods for metadata serializing/deserializing logic. */
public interface MetadataCodec {

MetadataCodec INSTANCE =
ServiceLoaderUtil.findFirst(MetadataCodec.class).orElseGet(JdkMetadataCodec::new);
StreamSupport.stream(ServiceLoader.load(MetadataCodec.class).spliterator(), false)
.findFirst()
.orElseGet(JdkMetadataCodec::new);

/**
* Deserializes metadata from buffer.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.scalecube.cluster.utils;

import io.scalecube.cluster.transport.api.Message;
import io.scalecube.net.Address;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -30,21 +29,21 @@ public final class NetworkEmulator {
private volatile OutboundSettings defaultOutboundSettings = new OutboundSettings(0, 0);
private volatile InboundSettings defaultInboundSettings = new InboundSettings(true);

private final Map<Address, OutboundSettings> outboundSettings = new ConcurrentHashMap<>();
private final Map<Address, InboundSettings> inboundSettings = new ConcurrentHashMap<>();
private final Map<String, OutboundSettings> outboundSettings = new ConcurrentHashMap<>();
private final Map<String, InboundSettings> inboundSettings = new ConcurrentHashMap<>();

private final AtomicLong totalMessageSentCount = new AtomicLong();
private final AtomicLong totalOutboundMessageLostCount = new AtomicLong();
private final AtomicLong totalInboundMessageLostCount = new AtomicLong();

private final Address address;
private final String address;

/**
* Creates new instance of network emulator.
*
* @param address local address
*/
NetworkEmulator(Address address) {
NetworkEmulator(String address) {
this.address = address;
}

Expand All @@ -56,7 +55,7 @@ public final class NetworkEmulator {
* @param destination address of target endpoint
* @return network outbound settings
*/
public OutboundSettings outboundSettings(Address destination) {
public OutboundSettings outboundSettings(String destination) {
return outboundSettings.getOrDefault(destination, defaultOutboundSettings);
}

Expand All @@ -67,7 +66,7 @@ public OutboundSettings outboundSettings(Address destination) {
* @param lossPercent loss in percents
* @param meanDelay mean delay
*/
public void outboundSettings(Address destination, int lossPercent, int meanDelay) {
public void outboundSettings(String destination, int lossPercent, int meanDelay) {
OutboundSettings settings = new OutboundSettings(lossPercent, meanDelay);
outboundSettings.put(destination, settings);
LOGGER.debug("[{}] Set outbound settings {} to {}", address, settings, destination);
Expand Down Expand Up @@ -103,7 +102,7 @@ public void unblockAllOutbound() {
*
* @param destinations collection of target endpoints where to apply
*/
public void blockOutbound(Address... destinations) {
public void blockOutbound(String... destinations) {
blockOutbound(Arrays.asList(destinations));
}

Expand All @@ -112,8 +111,8 @@ public void blockOutbound(Address... destinations) {
*
* @param destinations collection of target endpoints where to apply
*/
public void blockOutbound(Collection<Address> destinations) {
for (Address destination : destinations) {
public void blockOutbound(Collection<String> destinations) {
for (String destination : destinations) {
outboundSettings.put(destination, new OutboundSettings(100, 0));
}
LOGGER.debug("[{}] Blocked outbound to {}", address, destinations);
Expand All @@ -124,7 +123,7 @@ public void blockOutbound(Collection<Address> destinations) {
*
* @param destinations collection of target endpoints where to apply
*/
public void unblockOutbound(Address... destinations) {
public void unblockOutbound(String... destinations) {
unblockOutbound(Arrays.asList(destinations));
}

Expand All @@ -133,7 +132,7 @@ public void unblockOutbound(Address... destinations) {
*
* @param destinations collection of target endpoints where to apply
*/
public void unblockOutbound(Collection<Address> destinations) {
public void unblockOutbound(Collection<String> destinations) {
destinations.forEach(outboundSettings::remove);
LOGGER.debug("[{}] Unblocked outbound {}", address, destinations);
}
Expand Down Expand Up @@ -164,7 +163,7 @@ public long totalOutboundMessageLostCount() {
* @param address target address
* @return mono message
*/
public Mono<Message> tryFailOutbound(Message msg, Address address) {
public Mono<Message> tryFailOutbound(Message msg, String address) {
return Mono.defer(
() -> {
totalMessageSentCount.incrementAndGet();
Expand All @@ -187,7 +186,7 @@ public Mono<Message> tryFailOutbound(Message msg, Address address) {
* @param address target address
* @return mono message
*/
public Mono<Message> tryDelayOutbound(Message msg, Address address) {
public Mono<Message> tryDelayOutbound(Message msg, String address) {
return Mono.defer(
() -> {
totalMessageSentCount.incrementAndGet();
Expand All @@ -209,7 +208,7 @@ public Mono<Message> tryDelayOutbound(Message msg, Address address) {
* @param destination address of target endpoint
* @return network inbound settings
*/
public InboundSettings inboundSettings(Address destination) {
public InboundSettings inboundSettings(String destination) {
return inboundSettings.getOrDefault(destination, defaultInboundSettings);
}

Expand All @@ -218,7 +217,7 @@ public InboundSettings inboundSettings(Address destination) {
*
* @param shallPass shallPass inbound flag
*/
public void inboundSettings(Address destination, boolean shallPass) {
public void inboundSettings(String destination, boolean shallPass) {
InboundSettings settings = new InboundSettings(shallPass);
inboundSettings.put(destination, settings);
LOGGER.debug("[{}] Set inbound settings {} to {}", address, settings, destination);
Expand Down Expand Up @@ -253,7 +252,7 @@ public void unblockAllInbound() {
*
* @param destinations collection of target endpoints where to apply
*/
public void blockInbound(Address... destinations) {
public void blockInbound(String... destinations) {
blockInbound(Arrays.asList(destinations));
}

Expand All @@ -262,8 +261,8 @@ public void blockInbound(Address... destinations) {
*
* @param destinations collection of target endpoints where to apply
*/
public void blockInbound(Collection<Address> destinations) {
for (Address destination : destinations) {
public void blockInbound(Collection<String> destinations) {
for (String destination : destinations) {
inboundSettings.put(destination, new InboundSettings(false));
}
LOGGER.debug("[{}] Blocked inbound from {}", address, destinations);
Expand All @@ -274,7 +273,7 @@ public void blockInbound(Collection<Address> destinations) {
*
* @param destinations collection of target endpoints where to apply
*/
public void unblockInbound(Address... destinations) {
public void unblockInbound(String... destinations) {
unblockInbound(Arrays.asList(destinations));
}

Expand All @@ -283,7 +282,7 @@ public void unblockInbound(Address... destinations) {
*
* @param destinations collection of target endpoints where to apply
*/
public void unblockInbound(Collection<Address> destinations) {
public void unblockInbound(Collection<String> destinations) {
destinations.forEach(inboundSettings::remove);
LOGGER.debug("[{}] Unblocked inbound from {}", address, destinations);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import io.scalecube.cluster.transport.api.Message;
import io.scalecube.cluster.transport.api.Transport;
import io.scalecube.net.Address;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand All @@ -26,7 +25,7 @@ public NetworkEmulator networkEmulator() {
}

@Override
public Address address() {
public String address() {
return transport.address();
}

Expand All @@ -46,7 +45,7 @@ public boolean isStopped() {
}

@Override
public Mono<Void> send(Address address, Message message) {
public Mono<Void> send(String address, Message message) {
return Mono.defer(
() ->
Mono.just(enhanceWithSender(message))
Expand All @@ -56,7 +55,7 @@ public Mono<Void> send(Address address, Message message) {
}

@Override
public Mono<Message> requestResponse(Address address, Message request) {
public Mono<Message> requestResponse(String address, Message request) {
return Mono.defer(
() ->
Mono.just(enhanceWithSender(request))
Expand Down
Loading

0 comments on commit 08ba859

Please sign in to comment.