Skip to content

Commit

Permalink
Added tests for PayloadGenerationUpdated
Browse files Browse the repository at this point in the history
  • Loading branch information
artem-v committed Jul 30, 2024
1 parent e315f27 commit 72afa0c
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ public void put(MembershipRecord record) {
return;
}

tryUpdatePayloadGeneration(record);
if (!localMember.equals(member) && record.generation() > currRecord.generation()) {
emitPayloadGenerationUpdated(record);
}

if (record.incarnation() < currRecord.incarnation()) {
return;
Expand Down Expand Up @@ -117,15 +119,6 @@ public void updatePayloadGeneration(long generation, int payloadLength) {
localRecord.generation(generation).payloadLength(payloadLength);
}

private void tryUpdatePayloadGeneration(MembershipRecord record) {
final Member member = record.member();
final UUID key = member.id();
final MembershipRecord currRecord = recordMap.get(key);
if (currRecord.generation() < record.generation()) {
emitPayloadGenerationUpdated(record);
}
}

private void emitPayloadGenerationUpdated(MembershipRecord record) {
final UUID memberId = record.member().id();
final long generation = record.generation();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.scalecube.cluster2.Member;
import io.scalecube.cluster2.fdetector.FailureDetectorCodec;
import io.scalecube.cluster2.gossip.GossipMessageCodec;
import io.scalecube.cluster2.payload.PayloadCodec;
import io.scalecube.cluster2.sbe.MemberStatus;
import java.lang.reflect.Field;
import java.util.UUID;
Expand Down Expand Up @@ -190,6 +191,37 @@ void testOnFailureDetectorEvent() {
eq(memberStatus));
}

@Test
void testOnPayloadGenerationUpdated() {
final Member member = localRecord.member();

assertEquals(0, localRecord.generation(), "generation");
assertEquals(0, localRecord.payloadLength(), "payloadLength");

final int generation = 1;
final int payloadLength = 100;
emitPayloadGenerationUpdated(
codec -> codec.encodePayloadGenerationUpdated(member.id(), generation, payloadLength));

assertEquals(generation, localRecord.generation(), "generation");
assertEquals(payloadLength, localRecord.payloadLength(), "payloadLength");
}

@Test
void testOnPayloadGenerationUpdatedForNonLocalMember() {
assertEquals(0, localRecord.generation(), "generation");
assertEquals(0, localRecord.payloadLength(), "payloadLength");

final int generation = 1;
final int payloadLength = 100;
final UUID nonLocalMemberId = UUID.randomUUID();
emitPayloadGenerationUpdated(
codec -> codec.encodePayloadGenerationUpdated(nonLocalMemberId, generation, payloadLength));

assertEquals(0, localRecord.generation(), "generation");
assertEquals(0, localRecord.payloadLength(), "payloadLength");
}

private void advanceClock(final long millis) {
epochClock.advance(millis);
membershipProtocol.doWork();
Expand Down Expand Up @@ -230,11 +262,11 @@ private void setMembershipProtocolPeriod(long val) {
}

private void emitSync(Function<SyncCodec, MutableDirectBuffer> function) {
final SyncCodec codec = new SyncCodec();
final SyncCodec syncCodec = new SyncCodec();
doAnswer(
invocation -> {
final MessageHandler messageHandler = (MessageHandler) invocation.getArguments()[0];
messageHandler.onMessage(1, function.apply(codec), 0, codec.encodedLength());
messageHandler.onMessage(1, function.apply(syncCodec), 0, syncCodec.encodedLength());
return 1;
})
.when(messagePoller)
Expand All @@ -243,11 +275,11 @@ private void emitSync(Function<SyncCodec, MutableDirectBuffer> function) {
}

private void emitSyncAck(Function<SyncCodec, MutableDirectBuffer> function) {
final SyncCodec codec = new SyncCodec();
final SyncCodec syncCodec = new SyncCodec();
doAnswer(
invocation -> {
final MessageHandler messageHandler = (MessageHandler) invocation.getArguments()[0];
messageHandler.onMessage(1, function.apply(codec), 0, codec.encodedLength());
messageHandler.onMessage(1, function.apply(syncCodec), 0, syncCodec.encodedLength());
return 1;
})
.when(messagePoller)
Expand All @@ -256,11 +288,12 @@ private void emitSyncAck(Function<SyncCodec, MutableDirectBuffer> function) {
}

private void emitGossipInputMessage(Function<GossipMessageCodec, MutableDirectBuffer> function) {
final GossipMessageCodec codec = new GossipMessageCodec();
final GossipMessageCodec gossipMessageCodec = new GossipMessageCodec();
doAnswer(
invocation -> {
final MessageHandler messageHandler = (MessageHandler) invocation.getArguments()[0];
messageHandler.onMessage(1, function.apply(codec), 0, codec.encodedLength());
messageHandler.onMessage(
1, function.apply(gossipMessageCodec), 0, gossipMessageCodec.encodedLength());
return 1;
})
.when(messagePoller)
Expand All @@ -270,8 +303,15 @@ private void emitGossipInputMessage(Function<GossipMessageCodec, MutableDirectBu

private void emitFailureDetectorEvent(
Function<FailureDetectorCodec, MutableDirectBuffer> function) {
final FailureDetectorCodec codec = new FailureDetectorCodec();
messageTx.transmit(1, function.apply(codec), 0, codec.encodedLength());
final FailureDetectorCodec failureDetectorCodec = new FailureDetectorCodec();
messageTx.transmit(
1, function.apply(failureDetectorCodec), 0, failureDetectorCodec.encodedLength());
membershipProtocol.doWork();
}

private void emitPayloadGenerationUpdated(Function<PayloadCodec, MutableDirectBuffer> function) {
final PayloadCodec payloadCodec = new PayloadCodec();
messageTx.transmit(1, function.apply(payloadCodec), 0, payloadCodec.encodedLength());
membershipProtocol.doWork();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,52 @@ void testFilterByNamespace() {
assertEquals(localRecord, recordMap.get(localRecord.member().id()));
}

@Test
void testPayloadGenerationUpdated() {
final int generation = 1;
final int payloadLength = 100;

final CopyBroadcastReceiver messageRx = messageRxSupplier.get();
final MembershipRecord record = newRecord();
final MembershipRecord recordWithGreaterGeneration =
copyFrom(record, r -> r.generation(generation).payloadLength(payloadLength));

membershipTable.put(record);
membershipTable.put(recordWithGreaterGeneration);

assertPayloadGenerationUpdated(
messageRx,
decoder -> {
assertNotNull(decoder, "assertPayloadGenerationUpdated");
assertEquals(0, decoder.generation(), "generation");
assertEquals(0, decoder.payloadLength(), "payloadLength");
},
true);
assertPayloadGenerationUpdated(
messageRx,
decoder -> {
assertNotNull(decoder, "assertPayloadGenerationUpdated");
assertEquals(generation, decoder.generation(), "generation");
assertEquals(payloadLength, decoder.payloadLength(), "payloadLength");
},
false);
}

@Test
void testPayloadGenerationNotUpdatedForLocalMember() {
final int generation = 1;
final int payloadLength = 100;

final CopyBroadcastReceiver messageRx = messageRxSupplier.get();
final MembershipRecord localRecordWithGreaterGeneration =
copyFrom(localRecord, r -> r.generation(generation).payloadLength(payloadLength));

membershipTable.put(localRecordWithGreaterGeneration);

assertPayloadGenerationUpdated(
messageRx, decoder -> assertNull(decoder, "assertPayloadGenerationUpdated"), true);
}

private void advanceClock(final long millis) {
epochClock.advance(millis);
membershipTable.doWork();
Expand Down

0 comments on commit 72afa0c

Please sign in to comment.