From 8b17c78315ae5efecdfd5bee5e3ac9eae40759e2 Mon Sep 17 00:00:00 2001 From: Artem Vysochyn Date: Mon, 24 Jun 2024 20:16:35 +0300 Subject: [PATCH] WIP --- .../gossip/{GossipState.java => Gossip.java} | 16 ++++++- .../cluster2/gossip/GossipCodec.java | 8 ++-- .../cluster2/gossip/GossipProtocol.java | 43 ++++++++----------- .../cluster2/gossip/GossipRequestCodec.java | 4 +- 4 files changed, 39 insertions(+), 32 deletions(-) rename cluster2/src/main/java/io/scalecube/cluster2/gossip/{GossipState.java => Gossip.java} (71%) diff --git a/cluster2/src/main/java/io/scalecube/cluster2/gossip/GossipState.java b/cluster2/src/main/java/io/scalecube/cluster2/gossip/Gossip.java similarity index 71% rename from cluster2/src/main/java/io/scalecube/cluster2/gossip/GossipState.java rename to cluster2/src/main/java/io/scalecube/cluster2/gossip/Gossip.java index f56700c3..516b4ce6 100644 --- a/cluster2/src/main/java/io/scalecube/cluster2/gossip/GossipState.java +++ b/cluster2/src/main/java/io/scalecube/cluster2/gossip/Gossip.java @@ -2,9 +2,10 @@ import java.util.HashSet; import java.util.Set; +import java.util.StringJoiner; import java.util.UUID; -public class GossipState { +public class Gossip { /** Gossip id, local member id. */ private final UUID gossiperId; @@ -29,7 +30,7 @@ public class GossipState { * @param message message. * @param infectionPeriod infectionPeriod. */ - public GossipState(UUID gossiperId, long sequenceId, byte[] message, long infectionPeriod) { + public Gossip(UUID gossiperId, long sequenceId, byte[] message, long infectionPeriod) { this.gossiperId = gossiperId; this.sequenceId = sequenceId; this.message = message; @@ -63,4 +64,15 @@ public void addToInfected(UUID memberId) { public boolean isInfected(UUID memberId) { return infectedSet.contains(memberId); } + + @Override + public String toString() { + return new StringJoiner(", ", Gossip.class.getSimpleName() + "[", "]") + .add("gossiperId=" + gossiperId) + .add("sequenceId=" + sequenceId) + .add("message[" + message.length + "]") + .add("infectionPeriod=" + infectionPeriod) + .add("infectedSet[" + infectedSet.size() + "]") + .toString(); + } } diff --git a/cluster2/src/main/java/io/scalecube/cluster2/gossip/GossipCodec.java b/cluster2/src/main/java/io/scalecube/cluster2/gossip/GossipCodec.java index 5432a630..2d305506 100644 --- a/cluster2/src/main/java/io/scalecube/cluster2/gossip/GossipCodec.java +++ b/cluster2/src/main/java/io/scalecube/cluster2/gossip/GossipCodec.java @@ -11,13 +11,13 @@ public class GossipCodec extends AbstractCodec { public GossipCodec() {} - public MutableDirectBuffer encode(GossipState gossipState) { + public MutableDirectBuffer encode(Gossip gossip) { encodedLength = 0; gossipEncoder.wrapAndApplyHeader(encodedBuffer, 0, headerEncoder); - UUIDCodec.encode(gossipState.gossiperId(), gossipEncoder.gossiperId()); - gossipEncoder.sequenceId(gossipState.sequenceId()); - gossipEncoder.putMessage(gossipState.message(), 0, gossipState.message().length); + UUIDCodec.encode(gossip.gossiperId(), gossipEncoder.gossiperId()); + gossipEncoder.sequenceId(gossip.sequenceId()); + gossipEncoder.putMessage(gossip.message(), 0, gossip.message().length); encodedLength = headerEncoder.encodedLength() + gossipEncoder.encodedLength(); return encodedBuffer; diff --git a/cluster2/src/main/java/io/scalecube/cluster2/gossip/GossipProtocol.java b/cluster2/src/main/java/io/scalecube/cluster2/gossip/GossipProtocol.java index 136f1209..6b6c2402 100644 --- a/cluster2/src/main/java/io/scalecube/cluster2/gossip/GossipProtocol.java +++ b/cluster2/src/main/java/io/scalecube/cluster2/gossip/GossipProtocol.java @@ -38,17 +38,16 @@ public class GossipProtocol extends AbstractAgent { private final GossipDecoder gossipDecoder = new GossipDecoder(); private final MembershipEventDecoder membershipEventDecoder = new MembershipEventDecoder(); private final GossipRequestCodec gossipRequestCodec = new GossipRequestCodec(); - private final GossipCodec gossipCodec = new GossipCodec(); private final MemberCodec memberCodec = new MemberCodec(); private final UnsafeBuffer unsafeBuffer = new UnsafeBuffer(); private final String roleName; private long currentPeriod = 0; private long gossipCounter = 0; private final Map sequenceIdCollectors = new Object2ObjectHashMap<>(); - private final Map gossips = new Object2ObjectHashMap<>(); + private final Map gossips = new Object2ObjectHashMap<>(); private final List remoteMembers = new ArrayList<>(); private final List gossipMembers = new ArrayList<>(); - private final List gossipsToSend = new ArrayList<>(); + private final List gossipsToSend = new ArrayList<>(); private final List gossipsToRemove = new ArrayList<>(); public GossipProtocol( @@ -125,9 +124,9 @@ private void nextGossipsToRemove(long period) { final int periodsToSweep = ClusterMath.gossipPeriodsToSweep(config.gossipRepeatMult(), remoteMembers.size() + 1); - for (final GossipState gossipState : gossips.values()) { - if (period > gossipState.infectionPeriod() + periodsToSweep) { - gossipsToRemove.add(gossipState.gossipId()); + for (final Gossip gossip : gossips.values()) { + if (period > gossip.infectionPeriod() + periodsToSweep) { + gossipsToRemove.add(gossip.gossipId()); } } } @@ -152,12 +151,9 @@ private void spreadGossips(long period, Member member) { final UUID from = localMember.id(); for (int i = 0, n = gossipsToSend.size(); i < n; i++) { - final GossipState gossipState = gossipsToSend.get(i); + final Gossip gossip = gossipsToSend.get(i); transport.send( - address, - gossipRequestCodec.encode(from, gossipState), - 0, - gossipRequestCodec.encodedLength()); + address, gossipRequestCodec.encode(from, gossip), 0, gossipRequestCodec.encodedLength()); } } @@ -167,10 +163,9 @@ private void nextGossipsToSend(long period, Member member) { final int periodsToSpread = ClusterMath.gossipPeriodsToSpread(config.gossipRepeatMult(), remoteMembers.size() + 1); - for (final GossipState gossipState : gossips.values()) { - if (gossipState.infectionPeriod() + periodsToSpread >= period - && !gossipState.isInfected(member.id())) { - gossipsToSend.add(gossipState); + for (final Gossip gossip : gossips.values()) { + if (gossip.infectionPeriod() + periodsToSpread >= period && !gossip.isInfected(member.id())) { + gossipsToSend.add(gossip); } } } @@ -202,10 +197,10 @@ private void onGossipMessage(GossipMessageDecoder decoder) { final byte[] message = new byte[messageLength]; decoder.getMessage(message, 0, messageLength); - final GossipState gossipState = new GossipState(localMember.id(), sequenceId, message, period); + final Gossip gossip = new Gossip(localMember.id(), sequenceId, message, period); - gossips.put(gossipState.gossipId(), gossipState); - ensureSequence(localMember.id()).add(gossipState.sequenceId()); + gossips.put(gossip.gossipId(), gossip); + ensureSequence(localMember.id()).add(gossip.sequenceId()); } private void onGossipRequest(GossipRequestDecoder decoder) { @@ -223,18 +218,18 @@ private void onGossipRequest(GossipRequestDecoder decoder) { gossipDecoder.getMessage(message, 0, messageLength); final String gossipId = gossiperId + "-" + sequenceId; - GossipState gossipState = gossips.get(gossipId); + Gossip gossip = gossips.get(gossipId); if (ensureSequence(gossiperId).add(sequenceId)) { - if (gossipState == null) { // new gossip - gossipState = new GossipState(gossiperId, sequenceId, message, period); - gossips.put(gossipId, gossipState); + if (gossip == null) { // new gossip + gossip = new Gossip(gossiperId, sequenceId, message, period); + gossips.put(gossipId, gossip); emitGossipMessage(message); } } - if (gossipState != null) { - gossipState.addToInfected(from); + if (gossip != null) { + gossip.addToInfected(from); } } diff --git a/cluster2/src/main/java/io/scalecube/cluster2/gossip/GossipRequestCodec.java b/cluster2/src/main/java/io/scalecube/cluster2/gossip/GossipRequestCodec.java index 084c50c4..bce28d49 100644 --- a/cluster2/src/main/java/io/scalecube/cluster2/gossip/GossipRequestCodec.java +++ b/cluster2/src/main/java/io/scalecube/cluster2/gossip/GossipRequestCodec.java @@ -13,12 +13,12 @@ public class GossipRequestCodec extends AbstractCodec { public GossipRequestCodec() {} - public MutableDirectBuffer encode(UUID from, GossipState gossipState) { + public MutableDirectBuffer encode(UUID from, Gossip gossip) { encodedLength = 0; gossipRequestEncoder.wrapAndApplyHeader(encodedBuffer, 0, headerEncoder); UUIDCodec.encode(from, gossipRequestEncoder.from()); - gossipRequestEncoder.putGossip(gossipCodec.encode(gossipState), 0, gossipCodec.encodedLength()); + gossipRequestEncoder.putGossip(gossipCodec.encode(gossip), 0, gossipCodec.encodedLength()); encodedLength = headerEncoder.encodedLength() + gossipRequestEncoder.encodedLength(); return encodedBuffer;