Skip to content

Commit

Permalink
fix: refactored code and tests to use InstantSource
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Peterson <[email protected]>
  • Loading branch information
mattp-swirldslabs committed Jun 26, 2024
1 parent 606757c commit b4b8245
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@
import io.grpc.stub.StreamObserver;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.InstantSource;

/**
* The LiveStreamObserverImpl class implements the LiveStreamObserver interface to pass blocks to the downstream consumer
Expand All @@ -37,11 +36,11 @@ public class LiveStreamObserverImpl implements LiveStreamObserver<BlockStreamSer

private final long timeoutThresholdMillis;

private final Clock producerLivenessClock;
private Instant producerLivenessInstant;
private final InstantSource producerLivenessClock;
private long producerLivenessMillis;

private final Clock consumerLivenessClock;
private Instant consumerLivenessInstant;
private final InstantSource consumerLivenessClock;
private long consumerLivenessMillis;

/**
* Constructor for the LiveStreamObserverImpl class.
Expand All @@ -51,8 +50,8 @@ public class LiveStreamObserverImpl implements LiveStreamObserver<BlockStreamSer
*/
public LiveStreamObserverImpl(
final long timeoutThresholdMillis,
final Clock producerLivenessClock,
final Clock consumerLivenessClock,
final InstantSource producerLivenessClock,
final InstantSource consumerLivenessClock,
final StreamMediator<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> mediator,
final StreamObserver<BlockStreamServiceGrpcProto.Block> responseStreamObserver) {

Expand All @@ -62,8 +61,8 @@ public LiveStreamObserverImpl(
this.mediator = mediator;
this.responseStreamObserver = responseStreamObserver;

this.producerLivenessInstant = Instant.now(producerLivenessClock);
this.consumerLivenessInstant = Instant.now(consumerLivenessClock);
this.producerLivenessMillis = producerLivenessClock.millis();
this.consumerLivenessMillis = consumerLivenessClock.millis();
}

/**
Expand All @@ -75,14 +74,14 @@ public LiveStreamObserverImpl(
public void notify(final BlockStreamServiceGrpcProto.Block block) {

// Check if the consumer has timed out. If so, unsubscribe the observer from the mediator.
if (Duration.between(consumerLivenessInstant, Instant.now(consumerLivenessClock)).toMillis() > timeoutThresholdMillis) {
if (consumerLivenessClock.millis() - consumerLivenessMillis > timeoutThresholdMillis) {
if (mediator.isSubscribed(this)) {
LOGGER.log(System.Logger.Level.DEBUG, "Consumer timeout threshold exceeded. Unsubscribing observer.");
mediator.unsubscribe(this);
}
} else {
// Refresh the producer liveness and pass the block to the observer.
producerLivenessInstant = Instant.now(producerLivenessClock);
producerLivenessMillis = producerLivenessClock.millis();
responseStreamObserver.onNext(block);
}
}
Expand All @@ -95,12 +94,12 @@ public void notify(final BlockStreamServiceGrpcProto.Block block) {
@Override
public void onNext(final BlockStreamServiceGrpcProto.BlockResponse blockResponse) {

if (Duration.between(producerLivenessInstant, Instant.now(producerLivenessClock)).toMillis() > timeoutThresholdMillis) {
if (producerLivenessClock.millis() - producerLivenessMillis > timeoutThresholdMillis) {
LOGGER.log(System.Logger.Level.DEBUG, "Producer timeout threshold exceeded. Unsubscribing observer.");
mediator.unsubscribe(this);
} else {
LOGGER.log(System.Logger.Level.DEBUG, "Received response block " + blockResponse);
consumerLivenessInstant = Instant.now(consumerLivenessClock);
consumerLivenessMillis = consumerLivenessClock.millis();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,18 @@
import org.mockito.junit.jupiter.MockitoExtension;

import java.time.Clock;
import java.time.Instant;
import java.time.InstantSource;
import java.time.ZoneId;

import static org.mockito.Mockito.*;

@ExtendWith(MockitoExtension.class)
public class LiveStreamObserverImplTest {

private final long TIMEOUT_THRESHOLD_MILLIS = 50L;
private final long TEST_TIME = 1719427664950L;

@Mock
private StreamMediator<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> streamMediator;

Expand All @@ -41,9 +47,9 @@ public class LiveStreamObserverImplTest {
@Test
public void testConsumerTimeoutWithinWindow() {
final LiveStreamObserver<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> liveStreamObserver = new LiveStreamObserverImpl(
50,
Clock.systemDefaultZone(),
Clock.systemDefaultZone(),
TIMEOUT_THRESHOLD_MILLIS,
buildClockInsideWindow(TEST_TIME, TIMEOUT_THRESHOLD_MILLIS),
buildClockInsideWindow(TEST_TIME, TIMEOUT_THRESHOLD_MILLIS),
streamMediator,
responseStreamObserver);
BlockStreamServiceGrpcProto.Block newBlock = BlockStreamServiceGrpcProto.Block.newBuilder().build();
Expand All @@ -57,15 +63,15 @@ public void testConsumerTimeoutWithinWindow() {

@Test
public void testConsumerTimeoutOutsideWindow() throws InterruptedException {

final LiveStreamObserver<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> liveStreamObserver = new LiveStreamObserverImpl(
50,
Clock.systemDefaultZone(),
Clock.systemDefaultZone(),
TIMEOUT_THRESHOLD_MILLIS,
buildClockOutsideWindow(TEST_TIME, TIMEOUT_THRESHOLD_MILLIS),
buildClockOutsideWindow(TEST_TIME, TIMEOUT_THRESHOLD_MILLIS),
streamMediator,
responseStreamObserver);

Thread.sleep(51);
BlockStreamServiceGrpcProto.Block newBlock = BlockStreamServiceGrpcProto.Block.newBuilder().build();
final BlockStreamServiceGrpcProto.Block newBlock = BlockStreamServiceGrpcProto.Block.newBuilder().build();
when(streamMediator.isSubscribed(liveStreamObserver)).thenReturn(true);
liveStreamObserver.notify(newBlock);
verify(streamMediator).unsubscribe(liveStreamObserver);
Expand All @@ -74,9 +80,9 @@ public void testConsumerTimeoutOutsideWindow() throws InterruptedException {
@Test
public void testProducerTimeoutWithinWindow() {
final LiveStreamObserver<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> liveStreamObserver = new LiveStreamObserverImpl(
50,
Clock.systemDefaultZone(),
Clock.systemDefaultZone(),
TIMEOUT_THRESHOLD_MILLIS,
buildClockInsideWindow(TEST_TIME, TIMEOUT_THRESHOLD_MILLIS),
buildClockInsideWindow(TEST_TIME, TIMEOUT_THRESHOLD_MILLIS),
streamMediator,
responseStreamObserver);

Expand All @@ -90,9 +96,9 @@ public void testProducerTimeoutWithinWindow() {
@Test
public void testProducerTimeoutOutsideWindow() throws InterruptedException {
final LiveStreamObserver<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> liveStreamObserver = new LiveStreamObserverImpl(
50,
Clock.systemDefaultZone(),
Clock.systemDefaultZone(),
TIMEOUT_THRESHOLD_MILLIS,
buildClockOutsideWindow(TEST_TIME, TIMEOUT_THRESHOLD_MILLIS),
buildClockOutsideWindow(TEST_TIME, TIMEOUT_THRESHOLD_MILLIS),
streamMediator,
responseStreamObserver);

Expand All @@ -102,4 +108,32 @@ public void testProducerTimeoutOutsideWindow() throws InterruptedException {

verify(streamMediator).unsubscribe(liveStreamObserver);
}

private static InstantSource buildClockInsideWindow(long testTime, long timeoutThresholdMillis) {
return new TestClock(testTime, testTime + timeoutThresholdMillis - 1);
}

private static InstantSource buildClockOutsideWindow(long testTime, long timeoutThresholdMillis) {
return new TestClock(testTime, testTime + timeoutThresholdMillis + 1);
}

static class TestClock implements InstantSource {

private int index;
private final Long[] millis;

TestClock(Long... millis) {
this.millis = millis;
}

@Override
public long millis() {
return millis[index++];
}

@Override
public Instant instant() {
return null;
}
}
}

0 comments on commit b4b8245

Please sign in to comment.