From 3ad4efc2a207ef9f698904d92ae277e5f0abb134 Mon Sep 17 00:00:00 2001 From: Georgi Lazarov Date: Thu, 19 Dec 2024 11:23:58 +0200 Subject: [PATCH] chore: Fix Simulator unbounded memory growth (#406) Signed-off-by: georgi-l95 --- ...imulatorMappedConfigSourceInitializer.java | 1 + .../config/data/BlockStreamConfig.java | 15 ++++++++++++ .../simulator/config/data/StreamStatus.java | 15 ++++++------ .../impl/ConsumerStreamGrpcClientImpl.java | 20 ++++++++++++---- .../grpc/impl/ConsumerStreamObserver.java | 12 ++++++++-- .../impl/PublishStreamGrpcClientImpl.java | 11 ++++++--- .../grpc/impl/PublishStreamObserver.java | 14 ++++++++--- .../simulator/BlockStreamSimulatorTest.java | 3 ++- ...atorMappedConfigSourceInitializerTest.java | 1 + .../config/data/BlockStreamConfigTest.java | 10 ++++++++ .../config/data/StreamStatusTest.java | 23 ++++++++++++------- .../ConsumerStreamGrpcClientImplTest.java | 7 +++++- .../grpc/impl/ConsumerStreamObserverTest.java | 22 +++++++++++------- .../grpc/impl/PublishStreamObserverTest.java | 21 ++++++++++------- .../PositiveEndpointBehaviourTests.java | 8 +++---- 15 files changed, 133 insertions(+), 50 deletions(-) diff --git a/simulator/src/main/java/com/hedera/block/simulator/config/SimulatorMappedConfigSourceInitializer.java b/simulator/src/main/java/com/hedera/block/simulator/config/SimulatorMappedConfigSourceInitializer.java index 33f1fb54f..92efd0c7f 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/config/SimulatorMappedConfigSourceInitializer.java +++ b/simulator/src/main/java/com/hedera/block/simulator/config/SimulatorMappedConfigSourceInitializer.java @@ -33,6 +33,7 @@ public final class SimulatorMappedConfigSourceInitializer { // Block stream configuration new ConfigMapping("blockStream.simulatorMode", "BLOCK_STREAM_SIMULATOR_MODE"), + new ConfigMapping("blockStream.lastKnownStatusesCapacity", "BLOCK_STREAM_LAST_KNOWN_STATUSES_CAPACITY"), new ConfigMapping("blockStream.delayBetweenBlockItems", "BLOCK_STREAM_DELAY_BETWEEN_BLOCK_ITEMS"), new ConfigMapping("blockStream.maxBlockItemsToStream", "BLOCK_STREAM_MAX_BLOCK_ITEMS_TO_STREAM"), new ConfigMapping("blockStream.streamingMode", "BLOCK_STREAM_STREAMING_MODE"), diff --git a/simulator/src/main/java/com/hedera/block/simulator/config/data/BlockStreamConfig.java b/simulator/src/main/java/com/hedera/block/simulator/config/data/BlockStreamConfig.java index c0dea3bc1..34aba7c8d 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/config/data/BlockStreamConfig.java +++ b/simulator/src/main/java/com/hedera/block/simulator/config/data/BlockStreamConfig.java @@ -25,6 +25,7 @@ * Defines the configuration data for the block stream in the Hedera Block Simulator. * * @param simulatorMode the mode of the simulator, in terms of publishing, consuming or both + * @param lastKnownStatusesCapacity the capacity of the last known statuses * @param delayBetweenBlockItems the delay in microseconds between streaming each block item * @param maxBlockItemsToStream the maximum number of block items to stream before stopping * @param streamingMode the mode of streaming for the block stream (e.g., time-based, count-based) @@ -34,6 +35,7 @@ @ConfigData("blockStream") public record BlockStreamConfig( @ConfigProperty(defaultValue = "PUBLISHER") SimulatorMode simulatorMode, + @ConfigProperty(defaultValue = "10") int lastKnownStatusesCapacity, @ConfigProperty(defaultValue = "1_500_000") int delayBetweenBlockItems, @ConfigProperty(defaultValue = "100_000") int maxBlockItemsToStream, @ConfigProperty(defaultValue = "MILLIS_PER_BLOCK") StreamingMode streamingMode, @@ -54,6 +56,7 @@ public static Builder builder() { */ public static class Builder { private SimulatorMode simulatorMode = SimulatorMode.PUBLISHER; + private int lastKnownStatusesCapacity = 10; private int delayBetweenBlockItems = 1_500_000; private int maxBlockItemsToStream = 10_000; private StreamingMode streamingMode = StreamingMode.MILLIS_PER_BLOCK; @@ -78,6 +81,17 @@ public Builder simulatorMode(SimulatorMode simulatorMode) { return this; } + /** + * Sets the capacity of the last known statuses. + * + * @param lastKnownStatusesCapacity the capacity + * @return this {@code Builder} instance + */ + public Builder lastKnownStatusesCapacity(int lastKnownStatusesCapacity) { + this.lastKnownStatusesCapacity = lastKnownStatusesCapacity; + return this; + } + /** * Sets the delay between streaming each block item. * @@ -141,6 +155,7 @@ public Builder blockItemsBatchSize(int blockItemsBatchSize) { public BlockStreamConfig build() { return new BlockStreamConfig( simulatorMode, + lastKnownStatusesCapacity, delayBetweenBlockItems, maxBlockItemsToStream, streamingMode, diff --git a/simulator/src/main/java/com/hedera/block/simulator/config/data/StreamStatus.java b/simulator/src/main/java/com/hedera/block/simulator/config/data/StreamStatus.java index 68abf9ec4..79919a29a 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/config/data/StreamStatus.java +++ b/simulator/src/main/java/com/hedera/block/simulator/config/data/StreamStatus.java @@ -19,7 +19,8 @@ import static com.hedera.block.common.utils.Preconditions.requireWhole; import static java.util.Objects.requireNonNull; -import java.util.ArrayList; +import java.util.ArrayDeque; +import java.util.Deque; import java.util.List; /** @@ -33,8 +34,8 @@ public record StreamStatus( long publishedBlocks, long consumedBlocks, - List lastKnownPublisherStatuses, - List lastKnownConsumersStatuses) { + Deque lastKnownPublisherStatuses, + Deque lastKnownConsumersStatuses) { /** * Creates a new {@link Builder} instance for constructing a {@code StreamStatus}. @@ -51,8 +52,8 @@ public static Builder builder() { public static class Builder { private long publishedBlocks = 0; private long consumedBlocks = 0; - private List lastKnownPublisherStatuses = new ArrayList<>(); - private List lastKnownConsumersStatuses = new ArrayList<>(); + private Deque lastKnownPublisherStatuses = new ArrayDeque<>(); + private Deque lastKnownConsumersStatuses = new ArrayDeque<>(); /** * Creates a new instance of the {@code Builder} class with default configuration values. @@ -93,7 +94,7 @@ public Builder consumedBlocks(long consumedBlocks) { */ public Builder lastKnownPublisherStatuses(List lastKnownPublisherStatuses) { requireNonNull(lastKnownPublisherStatuses); - this.lastKnownPublisherStatuses = new ArrayList<>(lastKnownPublisherStatuses); + this.lastKnownPublisherStatuses = new ArrayDeque<>(lastKnownPublisherStatuses); return this; } @@ -105,7 +106,7 @@ public Builder lastKnownPublisherStatuses(List lastKnownPublisherStatuse */ public Builder lastKnownConsumersStatuses(List lastKnownConsumersStatuses) { requireNonNull(lastKnownConsumersStatuses); - this.lastKnownConsumersStatuses = new ArrayList<>(lastKnownConsumersStatuses); + this.lastKnownConsumersStatuses = new ArrayDeque<>(lastKnownConsumersStatuses); return this; } diff --git a/simulator/src/main/java/com/hedera/block/simulator/grpc/impl/ConsumerStreamGrpcClientImpl.java b/simulator/src/main/java/com/hedera/block/simulator/grpc/impl/ConsumerStreamGrpcClientImpl.java index 70e75ec83..ce0ad580a 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/grpc/impl/ConsumerStreamGrpcClientImpl.java +++ b/simulator/src/main/java/com/hedera/block/simulator/grpc/impl/ConsumerStreamGrpcClientImpl.java @@ -20,6 +20,7 @@ import static java.util.Objects.requireNonNull; import com.hedera.block.common.utils.Preconditions; +import com.hedera.block.simulator.config.data.BlockStreamConfig; import com.hedera.block.simulator.config.data.GrpcConfig; import com.hedera.block.simulator.grpc.ConsumerStreamGrpcClient; import com.hedera.block.simulator.metrics.MetricsService; @@ -30,7 +31,8 @@ import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.stub.StreamObserver; -import java.util.ArrayList; +import java.util.ArrayDeque; +import java.util.Deque; import java.util.List; import java.util.concurrent.CountDownLatch; import javax.inject.Inject; @@ -43,6 +45,7 @@ public class ConsumerStreamGrpcClientImpl implements ConsumerStreamGrpcClient { // Configuration private final GrpcConfig grpcConfig; + private final BlockStreamConfig blockStreamConfig; // Service dependencies private final MetricsService metricsService; @@ -53,22 +56,28 @@ public class ConsumerStreamGrpcClientImpl implements ConsumerStreamGrpcClient { private StreamObserver consumerStreamObserver; // State - private final List lastKnownStatuses; + private final int lastKnownStatusesCapacity; + private final Deque lastKnownStatuses; private CountDownLatch streamLatch; /** * Constructs a new ConsumerStreamGrpcClientImpl with the specified configuration and metrics service. * * @param grpcConfig The configuration for gRPC connection settings + * @param blockStreamConfig The configuration for the block stream * @param metricsService The service for recording consumption metrics * @throws NullPointerException if any parameter is null */ @Inject public ConsumerStreamGrpcClientImpl( - @NonNull final GrpcConfig grpcConfig, @NonNull final MetricsService metricsService) { + @NonNull final GrpcConfig grpcConfig, + @NonNull final BlockStreamConfig blockStreamConfig, + @NonNull final MetricsService metricsService) { this.grpcConfig = requireNonNull(grpcConfig); this.metricsService = requireNonNull(metricsService); - this.lastKnownStatuses = new ArrayList<>(); + this.blockStreamConfig = requireNonNull(blockStreamConfig); + this.lastKnownStatusesCapacity = blockStreamConfig.lastKnownStatusesCapacity(); + this.lastKnownStatuses = new ArrayDeque<>(lastKnownStatusesCapacity); } @Override @@ -87,7 +96,8 @@ public void requestBlocks(long startBlock, long endBlock) throws InterruptedExce Preconditions.requireWhole(endBlock); Preconditions.requireGreaterOrEqual(endBlock, startBlock); - consumerStreamObserver = new ConsumerStreamObserver(metricsService, streamLatch, lastKnownStatuses); + consumerStreamObserver = + new ConsumerStreamObserver(metricsService, streamLatch, lastKnownStatuses, lastKnownStatusesCapacity); SubscribeStreamRequest request = SubscribeStreamRequest.newBuilder() .setStartBlockNumber(startBlock) diff --git a/simulator/src/main/java/com/hedera/block/simulator/grpc/impl/ConsumerStreamObserver.java b/simulator/src/main/java/com/hedera/block/simulator/grpc/impl/ConsumerStreamObserver.java index baeb5c921..b124dbbbb 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/grpc/impl/ConsumerStreamObserver.java +++ b/simulator/src/main/java/com/hedera/block/simulator/grpc/impl/ConsumerStreamObserver.java @@ -27,6 +27,7 @@ import edu.umd.cs.findbugs.annotations.NonNull; import io.grpc.Status; import io.grpc.stub.StreamObserver; +import java.util.Deque; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -42,7 +43,8 @@ public class ConsumerStreamObserver implements StreamObserver lastKnownStatuses; + private final int lastKnownStatusesCapacity; + private final Deque lastKnownStatuses; /** * Constructs a new ConsumerStreamObserver. @@ -50,15 +52,18 @@ public class ConsumerStreamObserver implements StreamObserver lastKnownStatuses) { + @NonNull final Deque lastKnownStatuses, + final int lastKnownStatusesCapacity) { this.metricsService = requireNonNull(metricsService); this.streamLatch = requireNonNull(streamLatch); this.lastKnownStatuses = requireNonNull(lastKnownStatuses); + this.lastKnownStatusesCapacity = lastKnownStatusesCapacity; } /** @@ -70,6 +75,9 @@ public ConsumerStreamObserver( @Override public void onNext(SubscribeStreamResponse subscribeStreamResponse) { final SubscribeStreamResponse.ResponseCase responseType = subscribeStreamResponse.getResponseCase(); + if (lastKnownStatuses.size() >= lastKnownStatusesCapacity) { + lastKnownStatuses.pollFirst(); + } lastKnownStatuses.add(subscribeStreamResponse.toString()); switch (responseType) { diff --git a/simulator/src/main/java/com/hedera/block/simulator/grpc/impl/PublishStreamGrpcClientImpl.java b/simulator/src/main/java/com/hedera/block/simulator/grpc/impl/PublishStreamGrpcClientImpl.java index 4954f6b03..c2b4a58a2 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/grpc/impl/PublishStreamGrpcClientImpl.java +++ b/simulator/src/main/java/com/hedera/block/simulator/grpc/impl/PublishStreamGrpcClientImpl.java @@ -37,7 +37,8 @@ import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.stub.StreamObserver; -import java.util.ArrayList; +import java.util.ArrayDeque; +import java.util.Deque; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import javax.inject.Inject; @@ -63,7 +64,8 @@ public class PublishStreamGrpcClientImpl implements PublishStreamGrpcClient { // State private final AtomicBoolean streamEnabled; - private final List lastKnownStatuses = new ArrayList<>(); + private final int lastKnownStatusesCapacity; + private final Deque lastKnownStatuses; /** * Creates a new PublishStreamGrpcClientImpl with the specified dependencies. @@ -84,6 +86,8 @@ public PublishStreamGrpcClientImpl( this.blockStreamConfig = requireNonNull(blockStreamConfig); this.metricsService = requireNonNull(metricsService); this.streamEnabled = requireNonNull(streamEnabled); + this.lastKnownStatusesCapacity = blockStreamConfig.lastKnownStatusesCapacity(); + lastKnownStatuses = new ArrayDeque<>(this.lastKnownStatusesCapacity); } /** @@ -95,7 +99,8 @@ public void init() { .usePlaintext() .build(); BlockStreamServiceGrpc.BlockStreamServiceStub stub = BlockStreamServiceGrpc.newStub(channel); - PublishStreamObserver publishStreamObserver = new PublishStreamObserver(streamEnabled, lastKnownStatuses); + PublishStreamObserver publishStreamObserver = + new PublishStreamObserver(streamEnabled, lastKnownStatuses, lastKnownStatusesCapacity); requestStreamObserver = stub.publishBlockStream(publishStreamObserver); lastKnownStatuses.clear(); } diff --git a/simulator/src/main/java/com/hedera/block/simulator/grpc/impl/PublishStreamObserver.java b/simulator/src/main/java/com/hedera/block/simulator/grpc/impl/PublishStreamObserver.java index 94d53c93f..8365983d1 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/grpc/impl/PublishStreamObserver.java +++ b/simulator/src/main/java/com/hedera/block/simulator/grpc/impl/PublishStreamObserver.java @@ -24,7 +24,7 @@ import edu.umd.cs.findbugs.annotations.NonNull; import io.grpc.Status; import io.grpc.stub.StreamObserver; -import java.util.List; +import java.util.Deque; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -36,19 +36,24 @@ public class PublishStreamObserver implements StreamObserver lastKnownStatuses; + private final int lastKnownStatusesCapacity; + private final Deque lastKnownStatuses; /** * Creates a new PublishStreamObserver instance. * * @param streamEnabled Controls whether streaming should continue * @param lastKnownStatuses List to store the most recent status messages + * @param lastKnownStatusesCapacity the capacity of the last known statuses * @throws NullPointerException if any parameter is null */ public PublishStreamObserver( - @NonNull final AtomicBoolean streamEnabled, @NonNull final List lastKnownStatuses) { + @NonNull final AtomicBoolean streamEnabled, + @NonNull final Deque lastKnownStatuses, + final int lastKnownStatusesCapacity) { this.streamEnabled = requireNonNull(streamEnabled); this.lastKnownStatuses = requireNonNull(lastKnownStatuses); + this.lastKnownStatusesCapacity = lastKnownStatusesCapacity; } /** @@ -58,6 +63,9 @@ public PublishStreamObserver( */ @Override public void onNext(PublishStreamResponse publishStreamResponse) { + if (lastKnownStatuses.size() >= lastKnownStatusesCapacity) { + lastKnownStatuses.pollFirst(); + } lastKnownStatuses.add(publishStreamResponse.toString()); LOGGER.log(INFO, "Received Response: " + publishStreamResponse.toString()); } diff --git a/simulator/src/test/java/com/hedera/block/simulator/BlockStreamSimulatorTest.java b/simulator/src/test/java/com/hedera/block/simulator/BlockStreamSimulatorTest.java index f52fab222..8617f6eb9 100644 --- a/simulator/src/test/java/com/hedera/block/simulator/BlockStreamSimulatorTest.java +++ b/simulator/src/test/java/com/hedera/block/simulator/BlockStreamSimulatorTest.java @@ -20,6 +20,7 @@ import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertIterableEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -300,7 +301,7 @@ void testGetStreamStatus() { assertNotNull(streamStatus, "StreamStatus should not be null"); assertEquals(expectedPublishedBlocks, streamStatus.publishedBlocks(), "Published blocks should match"); - assertEquals( + assertIterableEquals( expectedLastKnownStatuses, streamStatus.lastKnownPublisherStatuses(), "Last known statuses should match"); diff --git a/simulator/src/test/java/com/hedera/block/simulator/config/SimulatorMappedConfigSourceInitializerTest.java b/simulator/src/test/java/com/hedera/block/simulator/config/SimulatorMappedConfigSourceInitializerTest.java index 0fa9081c6..cde5199aa 100644 --- a/simulator/src/test/java/com/hedera/block/simulator/config/SimulatorMappedConfigSourceInitializerTest.java +++ b/simulator/src/test/java/com/hedera/block/simulator/config/SimulatorMappedConfigSourceInitializerTest.java @@ -53,6 +53,7 @@ class SimulatorMappedConfigSourceInitializerTest { // Block stream configuration new ConfigMapping("blockStream.simulatorMode", "BLOCK_STREAM_SIMULATOR_MODE"), + new ConfigMapping("blockStream.lastKnownStatusesCapacity", "BLOCK_STREAM_LAST_KNOWN_STATUSES_CAPACITY"), new ConfigMapping("blockStream.delayBetweenBlockItems", "BLOCK_STREAM_DELAY_BETWEEN_BLOCK_ITEMS"), new ConfigMapping("blockStream.maxBlockItemsToStream", "BLOCK_STREAM_MAX_BLOCK_ITEMS_TO_STREAM"), new ConfigMapping("blockStream.streamingMode", "BLOCK_STREAM_STREAMING_MODE"), diff --git a/simulator/src/test/java/com/hedera/block/simulator/config/data/BlockStreamConfigTest.java b/simulator/src/test/java/com/hedera/block/simulator/config/data/BlockStreamConfigTest.java index 194266cc7..19afb332d 100644 --- a/simulator/src/test/java/com/hedera/block/simulator/config/data/BlockStreamConfigTest.java +++ b/simulator/src/test/java/com/hedera/block/simulator/config/data/BlockStreamConfigTest.java @@ -78,6 +78,16 @@ void testSimulatorMode() { assertEquals(SimulatorMode.PUBLISHER, config.simulatorMode()); } + @Test + void testLastKnownStatusesCapacity() { + final int capacity = 20; + BlockStreamConfig config = getBlockStreamConfigBuilder() + .lastKnownStatusesCapacity(capacity) + .build(); + + assertEquals(capacity, config.lastKnownStatusesCapacity()); + } + @Test void testValidAbsolutePath() { // Setup valid folder path and generation mode diff --git a/simulator/src/test/java/com/hedera/block/simulator/config/data/StreamStatusTest.java b/simulator/src/test/java/com/hedera/block/simulator/config/data/StreamStatusTest.java index 6bc772188..d9f311f5b 100644 --- a/simulator/src/test/java/com/hedera/block/simulator/config/data/StreamStatusTest.java +++ b/simulator/src/test/java/com/hedera/block/simulator/config/data/StreamStatusTest.java @@ -17,6 +17,7 @@ package com.hedera.block.simulator.config.data; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertIterableEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -54,11 +55,11 @@ void testBuilderWithValues() { assertEquals(10, streamStatus.publishedBlocks(), "publishedBlocks should be 10"); assertEquals(8, streamStatus.consumedBlocks(), "consumedBlocks should be 8"); - assertEquals( + assertIterableEquals( publisherStatuses, streamStatus.lastKnownPublisherStatuses(), "lastKnownPublisherStatuses should match"); - assertEquals( + assertIterableEquals( consumerStatuses, streamStatus.lastKnownConsumersStatuses(), "lastKnownConsumersStatuses should match"); } @@ -75,11 +76,11 @@ void testBuilderSetters() { assertEquals(5, streamStatus.publishedBlocks(), "publishedBlocks should be 5"); assertEquals(3, streamStatus.consumedBlocks(), "consumedBlocks should be 3"); - assertEquals( + assertIterableEquals( List.of("PubStatus"), streamStatus.lastKnownPublisherStatuses(), "lastKnownPublisherStatuses should match"); - assertEquals( + assertIterableEquals( List.of("ConStatus"), streamStatus.lastKnownConsumersStatuses(), "lastKnownConsumersStatuses should match"); @@ -120,8 +121,14 @@ void testEqualsAndHashCode() { .lastKnownConsumersStatuses(consumerStatuses) .build(); - assertEquals(streamStatus1, streamStatus2, "StreamStatus instances should be equal"); - assertEquals(streamStatus1.hashCode(), streamStatus2.hashCode(), "Hash codes should be equal"); + assertIterableEquals( + streamStatus1.lastKnownPublisherStatuses(), + streamStatus2.lastKnownPublisherStatuses(), + "lastKnownPublisherStatuses should match"); + assertIterableEquals( + streamStatus1.lastKnownConsumersStatuses(), + streamStatus2.lastKnownConsumersStatuses(), + "lastKnownConsumersStatuses should match"); } @Test @@ -209,11 +216,11 @@ void testBuilderChaining() { assertEquals(2, streamStatus.publishedBlocks(), "publishedBlocks should be 2"); assertEquals(2, streamStatus.consumedBlocks(), "consumedBlocks should be 2"); - assertEquals( + assertIterableEquals( List.of("PubStatus"), streamStatus.lastKnownPublisherStatuses(), "lastKnownPublisherStatuses should match"); - assertEquals( + assertIterableEquals( List.of("ConStatus"), streamStatus.lastKnownConsumersStatuses(), "lastKnownConsumersStatuses should match"); diff --git a/simulator/src/test/java/com/hedera/block/simulator/grpc/impl/ConsumerStreamGrpcClientImplTest.java b/simulator/src/test/java/com/hedera/block/simulator/grpc/impl/ConsumerStreamGrpcClientImplTest.java index 089f073c1..316b26fa8 100644 --- a/simulator/src/test/java/com/hedera/block/simulator/grpc/impl/ConsumerStreamGrpcClientImplTest.java +++ b/simulator/src/test/java/com/hedera/block/simulator/grpc/impl/ConsumerStreamGrpcClientImplTest.java @@ -24,6 +24,7 @@ import static org.mockito.Mockito.when; import com.hedera.block.simulator.TestUtils; +import com.hedera.block.simulator.config.data.BlockStreamConfig; import com.hedera.block.simulator.config.data.GrpcConfig; import com.hedera.block.simulator.grpc.ConsumerStreamGrpcClient; import com.hedera.block.simulator.metrics.MetricsService; @@ -51,6 +52,9 @@ public class ConsumerStreamGrpcClientImplTest { @Mock private GrpcConfig grpcConfig; + @Mock + private BlockStreamConfig blockStreamConfig; + private ConsumerStreamGrpcClient consumerStreamGrpcClientImpl; private Server server; @@ -102,10 +106,11 @@ public void subscribeBlockStream( when(grpcConfig.serverAddress()).thenReturn("localhost"); when(grpcConfig.port()).thenReturn(serverPort); + when(blockStreamConfig.lastKnownStatusesCapacity()).thenReturn(10); final Configuration config = TestUtils.getTestConfiguration(); final MetricsService metricsService = new MetricsServiceImpl(getTestMetrics(config)); - consumerStreamGrpcClientImpl = new ConsumerStreamGrpcClientImpl(grpcConfig, metricsService); + consumerStreamGrpcClientImpl = new ConsumerStreamGrpcClientImpl(grpcConfig, blockStreamConfig, metricsService); consumerStreamGrpcClientImpl.init(); } diff --git a/simulator/src/test/java/com/hedera/block/simulator/grpc/impl/ConsumerStreamObserverTest.java b/simulator/src/test/java/com/hedera/block/simulator/grpc/impl/ConsumerStreamObserverTest.java index dd665d1e4..4f8ed18da 100644 --- a/simulator/src/test/java/com/hedera/block/simulator/grpc/impl/ConsumerStreamObserverTest.java +++ b/simulator/src/test/java/com/hedera/block/simulator/grpc/impl/ConsumerStreamObserverTest.java @@ -36,8 +36,7 @@ import com.hedera.hapi.block.stream.protoc.BlockProof; import com.swirlds.config.api.Configuration; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; +import java.util.ArrayDeque; import java.util.concurrent.CountDownLatch; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -46,8 +45,9 @@ class ConsumerStreamObserverTest { private MetricsService metricsService; private CountDownLatch streamLatch; - private List lastKnownStatuses; + private ArrayDeque lastKnownStatuses; private ConsumerStreamObserver observer; + private int lastKnownStatusesCapacity; @BeforeEach void setUp() throws IOException { @@ -55,18 +55,24 @@ void setUp() throws IOException { metricsService = spy(new MetricsServiceImpl(getTestMetrics(config))); streamLatch = mock(CountDownLatch.class); - List lastKnownStatuses = new ArrayList<>(); + ArrayDeque lastKnownStatuses = new ArrayDeque<>(); + lastKnownStatusesCapacity = 10; - observer = new ConsumerStreamObserver(metricsService, streamLatch, lastKnownStatuses); + observer = + new ConsumerStreamObserver(metricsService, streamLatch, lastKnownStatuses, lastKnownStatusesCapacity); } @Test void testConstructorWithNullArguments() { assertThrows( - NullPointerException.class, () -> new ConsumerStreamObserver(null, streamLatch, lastKnownStatuses)); + NullPointerException.class, + () -> new ConsumerStreamObserver(null, streamLatch, lastKnownStatuses, lastKnownStatusesCapacity)); assertThrows( - NullPointerException.class, () -> new ConsumerStreamObserver(metricsService, null, lastKnownStatuses)); - assertThrows(NullPointerException.class, () -> new ConsumerStreamObserver(metricsService, streamLatch, null)); + NullPointerException.class, + () -> new ConsumerStreamObserver(metricsService, null, lastKnownStatuses, lastKnownStatusesCapacity)); + assertThrows( + NullPointerException.class, + () -> new ConsumerStreamObserver(metricsService, streamLatch, null, lastKnownStatusesCapacity)); } @Test diff --git a/simulator/src/test/java/com/hedera/block/simulator/grpc/impl/PublishStreamObserverTest.java b/simulator/src/test/java/com/hedera/block/simulator/grpc/impl/PublishStreamObserverTest.java index a41e9c590..37c82a8d6 100644 --- a/simulator/src/test/java/com/hedera/block/simulator/grpc/impl/PublishStreamObserverTest.java +++ b/simulator/src/test/java/com/hedera/block/simulator/grpc/impl/PublishStreamObserverTest.java @@ -21,8 +21,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import com.hedera.hapi.block.protoc.PublishStreamResponse; -import java.util.ArrayList; -import java.util.List; +import java.util.ArrayDeque; import java.util.concurrent.atomic.AtomicBoolean; import org.junit.jupiter.api.Test; @@ -32,8 +31,10 @@ class PublishStreamObserverTest { void onNext() { PublishStreamResponse response = PublishStreamResponse.newBuilder().build(); AtomicBoolean streamEnabled = new AtomicBoolean(true); - List lastKnownStatuses = new ArrayList<>(); - PublishStreamObserver publishStreamObserver = new PublishStreamObserver(streamEnabled, lastKnownStatuses); + ArrayDeque lastKnownStatuses = new ArrayDeque<>(); + final int lastKnownStatusesCapacity = 10; + PublishStreamObserver publishStreamObserver = + new PublishStreamObserver(streamEnabled, lastKnownStatuses, lastKnownStatusesCapacity); publishStreamObserver.onNext(response); assertTrue(streamEnabled.get(), "streamEnabled should remain true after onCompleted"); @@ -43,8 +44,10 @@ void onNext() { @Test void onError() { AtomicBoolean streamEnabled = new AtomicBoolean(true); - List lastKnownStatuses = new ArrayList<>(); - PublishStreamObserver publishStreamObserver = new PublishStreamObserver(streamEnabled, lastKnownStatuses); + ArrayDeque lastKnownStatuses = new ArrayDeque<>(); + final int lastKnownStatusesCapacity = 10; + PublishStreamObserver publishStreamObserver = + new PublishStreamObserver(streamEnabled, lastKnownStatuses, lastKnownStatusesCapacity); publishStreamObserver.onError(new Throwable()); assertFalse(streamEnabled.get(), "streamEnabled should be set to false after onError"); @@ -54,8 +57,10 @@ void onError() { @Test void onCompleted() { AtomicBoolean streamEnabled = new AtomicBoolean(true); - List lastKnownStatuses = new ArrayList<>(); - PublishStreamObserver publishStreamObserver = new PublishStreamObserver(streamEnabled, lastKnownStatuses); + ArrayDeque lastKnownStatuses = new ArrayDeque<>(); + final int lastKnownStatusesCapacity = 10; + PublishStreamObserver publishStreamObserver = + new PublishStreamObserver(streamEnabled, lastKnownStatuses, lastKnownStatusesCapacity); publishStreamObserver.onCompleted(); assertTrue(streamEnabled.get(), "streamEnabled should remain true after onCompleted"); diff --git a/suites/src/main/java/com/hedera/block/suites/grpc/positive/PositiveEndpointBehaviourTests.java b/suites/src/main/java/com/hedera/block/suites/grpc/positive/PositiveEndpointBehaviourTests.java index d51b7b592..de8e3194f 100644 --- a/suites/src/main/java/com/hedera/block/suites/grpc/positive/PositiveEndpointBehaviourTests.java +++ b/suites/src/main/java/com/hedera/block/suites/grpc/positive/PositiveEndpointBehaviourTests.java @@ -16,7 +16,6 @@ package com.hedera.block.suites.grpc.positive; -import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import com.hedera.block.simulator.BlockStreamSimulatorApp; @@ -74,9 +73,10 @@ void verifyPublishBlockStreamEndpoint() throws IOException, InterruptedException blockStreamSimulatorApp.stop(); StreamStatus streamStatus = blockStreamSimulatorApp.getStreamStatus(); assertTrue(streamStatus.publishedBlocks() > 0); - assertEquals( - streamStatus.publishedBlocks(), - streamStatus.lastKnownPublisherStatuses().size()); + // We just need to make sure that number of published blocks is equal or greater than the statuses. Statuses are + // tracked in a queue to avoid unnecessary memory usage, therefore will always be less or equal to published. + assertTrue(streamStatus.publishedBlocks() + >= streamStatus.lastKnownPublisherStatuses().size()); // Verify each status contains the word "acknowledgement" streamStatus